Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ d7fd1f28

History | View | Annotate | Download (37.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encasulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  def __init__(self, op):
84
    """Constructor for the _QuededOpCode.
85

86
    @type op: L{opcodes.OpCode}
87
    @param op: the opcode we encapsulate
88

89
    """
90
    self.input = op
91
    self.status = constants.OP_STATUS_QUEUED
92
    self.result = None
93
    self.log = []
94
    self.start_timestamp = None
95
    self.end_timestamp = None
96

    
97
  @classmethod
98
  def Restore(cls, state):
99
    """Restore the _QueuedOpCode from the serialized form.
100

101
    @type state: dict
102
    @param state: the serialized state
103
    @rtype: _QueuedOpCode
104
    @return: a new _QueuedOpCode instance
105

106
    """
107
    obj = _QueuedOpCode.__new__(cls)
108
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109
    obj.status = state["status"]
110
    obj.result = state["result"]
111
    obj.log = state["log"]
112
    obj.start_timestamp = state.get("start_timestamp", None)
113
    obj.end_timestamp = state.get("end_timestamp", None)
114
    return obj
115

    
116
  def Serialize(self):
117
    """Serializes this _QueuedOpCode.
118

119
    @rtype: dict
120
    @return: the dictionary holding the serialized state
121

122
    """
123
    return {
124
      "input": self.input.__getstate__(),
125
      "status": self.status,
126
      "result": self.result,
127
      "log": self.log,
128
      "start_timestamp": self.start_timestamp,
129
      "end_timestamp": self.end_timestamp,
130
      }
131

    
132

    
133
class _QueuedJob(object):
134
  """In-memory job representation.
135

136
  This is what we use to track the user-submitted jobs. Locking must
137
  be taken care of by users of this class.
138

139
  @type queue: L{JobQueue}
140
  @ivar queue: the parent queue
141
  @ivar id: the job ID
142
  @type ops: list
143
  @ivar ops: the list of _QueuedOpCode that constitute the job
144
  @type run_op_index: int
145
  @ivar run_op_index: the currently executing opcode, or -1 if
146
      we didn't yet start executing
147
  @type log_serial: int
148
  @ivar log_serial: holds the index for the next log entry
149
  @ivar received_timestamp: the timestamp for when the job was received
150
  @ivar start_timestmap: the timestamp for start of execution
151
  @ivar end_timestamp: the timestamp for end of execution
152
  @ivar change: a Condition variable we use for waiting for job changes
153

154
  """
155
  def __init__(self, queue, job_id, ops):
156
    """Constructor for the _QueuedJob.
157

158
    @type queue: L{JobQueue}
159
    @param queue: our parent queue
160
    @type job_id: job_id
161
    @param job_id: our job id
162
    @type ops: list
163
    @param ops: the list of opcodes we hold, which will be encapsulated
164
        in _QueuedOpCodes
165

166
    """
167
    if not ops:
168
      # TODO: use a better exception
169
      raise Exception("No opcodes")
170

    
171
    self.queue = queue
172
    self.id = job_id
173
    self.ops = [_QueuedOpCode(op) for op in ops]
174
    self.run_op_index = -1
175
    self.log_serial = 0
176
    self.received_timestamp = TimeStampNow()
177
    self.start_timestamp = None
178
    self.end_timestamp = None
179

    
180
    # Condition to wait for changes
181
    self.change = threading.Condition(self.queue._lock)
182

    
183
  @classmethod
184
  def Restore(cls, queue, state):
185
    """Restore a _QueuedJob from serialized state:
186

187
    @type queue: L{JobQueue}
188
    @param queue: to which queue the restored job belongs
189
    @type state: dict
190
    @param state: the serialized state
191
    @rtype: _JobQueue
192
    @return: the restored _JobQueue instance
193

194
    """
195
    obj = _QueuedJob.__new__(cls)
196
    obj.queue = queue
197
    obj.id = state["id"]
198
    obj.run_op_index = state["run_op_index"]
199
    obj.received_timestamp = state.get("received_timestamp", None)
200
    obj.start_timestamp = state.get("start_timestamp", None)
201
    obj.end_timestamp = state.get("end_timestamp", None)
202

    
203
    obj.ops = []
204
    obj.log_serial = 0
205
    for op_state in state["ops"]:
206
      op = _QueuedOpCode.Restore(op_state)
207
      for log_entry in op.log:
208
        obj.log_serial = max(obj.log_serial, log_entry[0])
209
      obj.ops.append(op)
210

    
211
    # Condition to wait for changes
212
    obj.change = threading.Condition(obj.queue._lock)
213

    
214
    return obj
215

    
216
  def Serialize(self):
217
    """Serialize the _JobQueue instance.
218

219
    @rtype: dict
220
    @return: the serialized state
221

222
    """
223
    return {
224
      "id": self.id,
225
      "ops": [op.Serialize() for op in self.ops],
226
      "run_op_index": self.run_op_index,
227
      "start_timestamp": self.start_timestamp,
228
      "end_timestamp": self.end_timestamp,
229
      "received_timestamp": self.received_timestamp,
230
      }
231

    
232
  def CalcStatus(self):
233
    """Compute the status of this job.
234

235
    This function iterates over all the _QueuedOpCodes in the job and
236
    based on their status, computes the job status.
237

238
    The algorithm is:
239
      - if we find a cancelled, or finished with error, the job
240
        status will be the same
241
      - otherwise, the last opcode with the status one of:
242
          - waitlock
243
          - canceling
244
          - running
245

246
        will determine the job status
247

248
      - otherwise, it means either all opcodes are queued, or success,
249
        and the job status will be the same
250

251
    @return: the job status
252

253
    """
254
    status = constants.JOB_STATUS_QUEUED
255

    
256
    all_success = True
257
    for op in self.ops:
258
      if op.status == constants.OP_STATUS_SUCCESS:
259
        continue
260

    
261
      all_success = False
262

    
263
      if op.status == constants.OP_STATUS_QUEUED:
264
        pass
265
      elif op.status == constants.OP_STATUS_WAITLOCK:
266
        status = constants.JOB_STATUS_WAITLOCK
267
      elif op.status == constants.OP_STATUS_RUNNING:
268
        status = constants.JOB_STATUS_RUNNING
269
      elif op.status == constants.OP_STATUS_CANCELING:
270
        status = constants.JOB_STATUS_CANCELING
271
        break
272
      elif op.status == constants.OP_STATUS_ERROR:
273
        status = constants.JOB_STATUS_ERROR
274
        # The whole job fails if one opcode failed
275
        break
276
      elif op.status == constants.OP_STATUS_CANCELED:
277
        status = constants.OP_STATUS_CANCELED
278
        break
279

    
280
    if all_success:
281
      status = constants.JOB_STATUS_SUCCESS
282

    
283
    return status
284

    
285
  def GetLogEntries(self, newer_than):
286
    """Selectively returns the log entries.
287

288
    @type newer_than: None or int
289
    @param newer_than: if this is None, return all log enties,
290
        otherwise return only the log entries with serial higher
291
        than this value
292
    @rtype: list
293
    @return: the list of the log entries selected
294

295
    """
296
    if newer_than is None:
297
      serial = -1
298
    else:
299
      serial = newer_than
300

    
301
    entries = []
302
    for op in self.ops:
303
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
304

    
305
    return entries
306

    
307

    
308
class _JobQueueWorker(workerpool.BaseWorker):
309
  """The actual job workers.
310

311
  """
312
  def _NotifyStart(self):
313
    """Mark the opcode as running, not lock-waiting.
314

315
    This is called from the mcpu code as a notifier function, when the
316
    LU is finally about to start the Exec() method. Of course, to have
317
    end-user visible results, the opcode must be initially (before
318
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
319

320
    """
321
    assert self.queue, "Queue attribute is missing"
322
    assert self.opcode, "Opcode attribute is missing"
323

    
324
    self.queue.acquire()
325
    try:
326
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327
                                    constants.OP_STATUS_CANCELING)
328

    
329
      # Cancel here if we were asked to
330
      if self.opcode.status == constants.OP_STATUS_CANCELING:
331
        raise CancelJob()
332

    
333
      self.opcode.status = constants.OP_STATUS_RUNNING
334
    finally:
335
      self.queue.release()
336

    
337
  def RunTask(self, job):
338
    """Job executor.
339

340
    This functions processes a job. It is closely tied to the _QueuedJob and
341
    _QueuedOpCode classes.
342

343
    @type job: L{_QueuedJob}
344
    @param job: the job to be processed
345

346
    """
347
    logging.debug("Worker %s processing job %s",
348
                  self.worker_id, job.id)
349
    proc = mcpu.Processor(self.pool.queue.context)
350
    self.queue = queue = job.queue
351
    try:
352
      try:
353
        count = len(job.ops)
354
        for idx, op in enumerate(job.ops):
355
          try:
356
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
357

    
358
            queue.acquire()
359
            try:
360
              assert op.status == constants.OP_STATUS_QUEUED
361
              job.run_op_index = idx
362
              op.status = constants.OP_STATUS_WAITLOCK
363
              op.result = None
364
              op.start_timestamp = TimeStampNow()
365
              if idx == 0: # first opcode
366
                job.start_timestamp = op.start_timestamp
367
              queue.UpdateJobUnlocked(job)
368

    
369
              input_opcode = op.input
370
            finally:
371
              queue.release()
372

    
373
            def _Log(*args):
374
              """Append a log entry.
375

376
              """
377
              assert len(args) < 3
378

    
379
              if len(args) == 1:
380
                log_type = constants.ELOG_MESSAGE
381
                log_msg = args[0]
382
              else:
383
                log_type, log_msg = args
384

    
385
              # The time is split to make serialization easier and not lose
386
              # precision.
387
              timestamp = utils.SplitTime(time.time())
388

    
389
              queue.acquire()
390
              try:
391
                job.log_serial += 1
392
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
393

    
394
                job.change.notifyAll()
395
              finally:
396
                queue.release()
397

    
398
            # Make sure not to hold lock while _Log is called
399
            self.opcode = op
400
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
401

    
402
            queue.acquire()
403
            try:
404
              op.status = constants.OP_STATUS_SUCCESS
405
              op.result = result
406
              op.end_timestamp = TimeStampNow()
407
              queue.UpdateJobUnlocked(job)
408
            finally:
409
              queue.release()
410

    
411
            logging.debug("Op %s/%s: Successfully finished %s",
412
                          idx + 1, count, op)
413
          except CancelJob:
414
            # Will be handled further up
415
            raise
416
          except Exception, err:
417
            queue.acquire()
418
            try:
419
              try:
420
                op.status = constants.OP_STATUS_ERROR
421
                op.result = str(err)
422
                op.end_timestamp = TimeStampNow()
423
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
424
              finally:
425
                queue.UpdateJobUnlocked(job)
426
            finally:
427
              queue.release()
428
            raise
429

    
430
      except CancelJob:
431
        queue.acquire()
432
        try:
433
          queue.CancelJobUnlocked(job)
434
        finally:
435
          queue.release()
436
      except errors.GenericError, err:
437
        logging.exception("Ganeti exception")
438
      except:
439
        logging.exception("Unhandled exception")
440
    finally:
441
      queue.acquire()
442
      try:
443
        try:
444
          job.run_op_idx = -1
445
          job.end_timestamp = TimeStampNow()
446
          queue.UpdateJobUnlocked(job)
447
        finally:
448
          job_id = job.id
449
          status = job.CalcStatus()
450
      finally:
451
        queue.release()
452
      logging.debug("Worker %s finished job %s, status = %s",
453
                    self.worker_id, job_id, status)
454

    
455

    
456
class _JobQueueWorkerPool(workerpool.WorkerPool):
457
  """Simple class implementing a job-processing workerpool.
458

459
  """
460
  def __init__(self, queue):
461
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
462
                                              _JobQueueWorker)
463
    self.queue = queue
464

    
465

    
466
class JobQueue(object):
467
  """Quue used to manaage the jobs.
468

469
  @cvar _RE_JOB_FILE: regex matching the valid job file names
470

471
  """
472
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
473

    
474
  def _RequireOpenQueue(fn):
475
    """Decorator for "public" functions.
476

477
    This function should be used for all 'public' functions. That is,
478
    functions usually called from other classes.
479

480
    @warning: Use this decorator only after utils.LockedMethod!
481

482
    Example::
483
      @utils.LockedMethod
484
      @_RequireOpenQueue
485
      def Example(self):
486
        pass
487

488
    """
489
    def wrapper(self, *args, **kwargs):
490
      assert self._queue_lock is not None, "Queue should be open"
491
      return fn(self, *args, **kwargs)
492
    return wrapper
493

    
494
  def __init__(self, context):
495
    """Constructor for JobQueue.
496

497
    The constructor will initialize the job queue object and then
498
    start loading the current jobs from disk, either for starting them
499
    (if they were queue) or for aborting them (if they were already
500
    running).
501

502
    @type context: GanetiContext
503
    @param context: the context object for access to the configuration
504
        data and other ganeti objects
505

506
    """
507
    self.context = context
508
    self._memcache = weakref.WeakValueDictionary()
509
    self._my_hostname = utils.HostInfo().name
510

    
511
    # Locking
512
    self._lock = threading.Lock()
513
    self.acquire = self._lock.acquire
514
    self.release = self._lock.release
515

    
516
    # Initialize
517
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
518

    
519
    # Read serial file
520
    self._last_serial = jstore.ReadSerial()
521
    assert self._last_serial is not None, ("Serial file was modified between"
522
                                           " check in jstore and here")
523

    
524
    # Get initial list of nodes
525
    self._nodes = dict((n.name, n.primary_ip)
526
                       for n in self.context.cfg.GetAllNodesInfo().values()
527
                       if n.master_candidate)
528

    
529
    # Remove master node
530
    try:
531
      del self._nodes[self._my_hostname]
532
    except KeyError:
533
      pass
534

    
535
    # TODO: Check consistency across nodes
536

    
537
    # Setup worker pool
538
    self._wpool = _JobQueueWorkerPool(self)
539
    try:
540
      # We need to lock here because WorkerPool.AddTask() may start a job while
541
      # we're still doing our work.
542
      self.acquire()
543
      try:
544
        logging.info("Inspecting job queue")
545

    
546
        all_job_ids = self._GetJobIDsUnlocked()
547
        jobs_count = len(all_job_ids)
548
        lastinfo = time.time()
549
        for idx, job_id in enumerate(all_job_ids):
550
          # Give an update every 1000 jobs or 10 seconds
551
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
552
              idx == (jobs_count - 1)):
553
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
554
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
555
            lastinfo = time.time()
556

    
557
          job = self._LoadJobUnlocked(job_id)
558

    
559
          # a failure in loading the job can cause 'None' to be returned
560
          if job is None:
561
            continue
562

    
563
          status = job.CalcStatus()
564

    
565
          if status in (constants.JOB_STATUS_QUEUED, ):
566
            self._wpool.AddTask(job)
567

    
568
          elif status in (constants.JOB_STATUS_RUNNING,
569
                          constants.JOB_STATUS_WAITLOCK,
570
                          constants.JOB_STATUS_CANCELING):
571
            logging.warning("Unfinished job %s found: %s", job.id, job)
572
            try:
573
              for op in job.ops:
574
                op.status = constants.OP_STATUS_ERROR
575
                op.result = "Unclean master daemon shutdown"
576
            finally:
577
              self.UpdateJobUnlocked(job)
578

    
579
        logging.info("Job queue inspection finished")
580
      finally:
581
        self.release()
582
    except:
583
      self._wpool.TerminateWorkers()
584
      raise
585

    
586
  @utils.LockedMethod
587
  @_RequireOpenQueue
588
  def AddNode(self, node):
589
    """Register a new node with the queue.
590

591
    @type node: L{objects.Node}
592
    @param node: the node object to be added
593

594
    """
595
    node_name = node.name
596
    assert node_name != self._my_hostname
597

    
598
    # Clean queue directory on added node
599
    rpc.RpcRunner.call_jobqueue_purge(node_name)
600

    
601
    if not node.master_candidate:
602
      # remove if existing, ignoring errors
603
      self._nodes.pop(node_name, None)
604
      # and skip the replication of the job ids
605
      return
606

    
607
    # Upload the whole queue excluding archived jobs
608
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
609

    
610
    # Upload current serial file
611
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
612

    
613
    for file_name in files:
614
      # Read file content
615
      fd = open(file_name, "r")
616
      try:
617
        content = fd.read()
618
      finally:
619
        fd.close()
620

    
621
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
622
                                                  [node.primary_ip],
623
                                                  file_name, content)
624
      if not result[node_name]:
625
        logging.error("Failed to upload %s to %s", file_name, node_name)
626

    
627
    self._nodes[node_name] = node.primary_ip
628

    
629
  @utils.LockedMethod
630
  @_RequireOpenQueue
631
  def RemoveNode(self, node_name):
632
    """Callback called when removing nodes from the cluster.
633

634
    @type node_name: str
635
    @param node_name: the name of the node to remove
636

637
    """
638
    try:
639
      # The queue is removed by the "leave node" RPC call.
640
      del self._nodes[node_name]
641
    except KeyError:
642
      pass
643

    
644
  def _CheckRpcResult(self, result, nodes, failmsg):
645
    """Verifies the status of an RPC call.
646

647
    Since we aim to keep consistency should this node (the current
648
    master) fail, we will log errors if our rpc fail, and especially
649
    log the case when more than half of the nodes failes.
650

651
    @param result: the data as returned from the rpc call
652
    @type nodes: list
653
    @param nodes: the list of nodes we made the call to
654
    @type failmsg: str
655
    @param failmsg: the identifier to be used for logging
656

657
    """
658
    failed = []
659
    success = []
660

    
661
    for node in nodes:
662
      if result[node]:
663
        success.append(node)
664
      else:
665
        failed.append(node)
666

    
667
    if failed:
668
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
669

    
670
    # +1 for the master node
671
    if (len(success) + 1) < len(failed):
672
      # TODO: Handle failing nodes
673
      logging.error("More than half of the nodes failed")
674

    
675
  def _GetNodeIp(self):
676
    """Helper for returning the node name/ip list.
677

678
    @rtype: (list, list)
679
    @return: a tuple of two lists, the first one with the node
680
        names and the second one with the node addresses
681

682
    """
683
    name_list = self._nodes.keys()
684
    addr_list = [self._nodes[name] for name in name_list]
685
    return name_list, addr_list
686

    
687
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
688
    """Writes a file locally and then replicates it to all nodes.
689

690
    This function will replace the contents of a file on the local
691
    node and then replicate it to all the other nodes we have.
692

693
    @type file_name: str
694
    @param file_name: the path of the file to be replicated
695
    @type data: str
696
    @param data: the new contents of the file
697

698
    """
699
    utils.WriteFile(file_name, data=data)
700

    
701
    names, addrs = self._GetNodeIp()
702
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
703
    self._CheckRpcResult(result, self._nodes,
704
                         "Updating %s" % file_name)
705

    
706
  def _RenameFilesUnlocked(self, rename):
707
    """Renames a file locally and then replicate the change.
708

709
    This function will rename a file in the local queue directory
710
    and then replicate this rename to all the other nodes we have.
711

712
    @type rename: list of (old, new)
713
    @param rename: List containing tuples mapping old to new names
714

715
    """
716
    for old, new in rename:
717
      utils.RenameFile(old, new, mkdir=True)
718

    
719
      names, addrs = self._GetNodeIp()
720
      result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
721
      self._CheckRpcResult(result, self._nodes,
722
                           "Moving %s to %s" % (old, new))
723

    
724
  def _FormatJobID(self, job_id):
725
    """Convert a job ID to string format.
726

727
    Currently this just does C{str(job_id)} after performing some
728
    checks, but if we want to change the job id format this will
729
    abstract this change.
730

731
    @type job_id: int or long
732
    @param job_id: the numeric job id
733
    @rtype: str
734
    @return: the formatted job id
735

736
    """
737
    if not isinstance(job_id, (int, long)):
738
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
739
    if job_id < 0:
740
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
741

    
742
    return str(job_id)
743

    
744
  @classmethod
745
  def _GetArchiveDirectory(cls, job_id):
746
    """Returns the archive directory for a job.
747

748
    @type job_id: str
749
    @param job_id: Job identifier
750
    @rtype: str
751
    @return: Directory name
752

753
    """
754
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
755

    
756
  def _NewSerialUnlocked(self):
757
    """Generates a new job identifier.
758

759
    Job identifiers are unique during the lifetime of a cluster.
760

761
    @rtype: str
762
    @return: a string representing the job identifier.
763

764
    """
765
    # New number
766
    serial = self._last_serial + 1
767

    
768
    # Write to file
769
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
770
                                        "%s\n" % serial)
771

    
772
    # Keep it only if we were able to write the file
773
    self._last_serial = serial
774

    
775
    return self._FormatJobID(serial)
776

    
777
  @staticmethod
778
  def _GetJobPath(job_id):
779
    """Returns the job file for a given job id.
780

781
    @type job_id: str
782
    @param job_id: the job identifier
783
    @rtype: str
784
    @return: the path to the job file
785

786
    """
787
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
788

    
789
  @classmethod
790
  def _GetArchivedJobPath(cls, job_id):
791
    """Returns the archived job file for a give job id.
792

793
    @type job_id: str
794
    @param job_id: the job identifier
795
    @rtype: str
796
    @return: the path to the archived job file
797

798
    """
799
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
800
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
801

    
802
  @classmethod
803
  def _ExtractJobID(cls, name):
804
    """Extract the job id from a filename.
805

806
    @type name: str
807
    @param name: the job filename
808
    @rtype: job id or None
809
    @return: the job id corresponding to the given filename,
810
        or None if the filename does not represent a valid
811
        job file
812

813
    """
814
    m = cls._RE_JOB_FILE.match(name)
815
    if m:
816
      return m.group(1)
817
    else:
818
      return None
819

    
820
  def _GetJobIDsUnlocked(self, archived=False):
821
    """Return all known job IDs.
822

823
    If the parameter archived is True, archived jobs IDs will be
824
    included. Currently this argument is unused.
825

826
    The method only looks at disk because it's a requirement that all
827
    jobs are present on disk (so in the _memcache we don't have any
828
    extra IDs).
829

830
    @rtype: list
831
    @return: the list of job IDs
832

833
    """
834
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
835
    jlist = utils.NiceSort(jlist)
836
    return jlist
837

    
838
  def _ListJobFiles(self):
839
    """Returns the list of current job files.
840

841
    @rtype: list
842
    @return: the list of job file names
843

844
    """
845
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
846
            if self._RE_JOB_FILE.match(name)]
847

    
848
  def _LoadJobUnlocked(self, job_id):
849
    """Loads a job from the disk or memory.
850

851
    Given a job id, this will return the cached job object if
852
    existing, or try to load the job from the disk. If loading from
853
    disk, it will also add the job to the cache.
854

855
    @param job_id: the job id
856
    @rtype: L{_QueuedJob} or None
857
    @return: either None or the job object
858

859
    """
860
    job = self._memcache.get(job_id, None)
861
    if job:
862
      logging.debug("Found job %s in memcache", job_id)
863
      return job
864

    
865
    filepath = self._GetJobPath(job_id)
866
    logging.debug("Loading job from %s", filepath)
867
    try:
868
      fd = open(filepath, "r")
869
    except IOError, err:
870
      if err.errno in (errno.ENOENT, ):
871
        return None
872
      raise
873
    try:
874
      data = serializer.LoadJson(fd.read())
875
    finally:
876
      fd.close()
877

    
878
    try:
879
      job = _QueuedJob.Restore(self, data)
880
    except Exception, err:
881
      new_path = self._GetArchivedJobPath(job_id)
882
      if filepath == new_path:
883
        # job already archived (future case)
884
        logging.exception("Can't parse job %s", job_id)
885
      else:
886
        # non-archived case
887
        logging.exception("Can't parse job %s, will archive.", job_id)
888
        self._RenameFilesUnlocked([(filepath, new_path)])
889
      return None
890

    
891
    self._memcache[job_id] = job
892
    logging.debug("Added job %s to the cache", job_id)
893
    return job
894

    
895
  def _GetJobsUnlocked(self, job_ids):
896
    """Return a list of jobs based on their IDs.
897

898
    @type job_ids: list
899
    @param job_ids: either an empty list (meaning all jobs),
900
        or a list of job IDs
901
    @rtype: list
902
    @return: the list of job objects
903

904
    """
905
    if not job_ids:
906
      job_ids = self._GetJobIDsUnlocked()
907

    
908
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
909

    
910
  @staticmethod
911
  def _IsQueueMarkedDrain():
912
    """Check if the queue is marked from drain.
913

914
    This currently uses the queue drain file, which makes it a
915
    per-node flag. In the future this can be moved to the config file.
916

917
    @rtype: boolean
918
    @return: True of the job queue is marked for draining
919

920
    """
921
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
922

    
923
  @staticmethod
924
  def SetDrainFlag(drain_flag):
925
    """Sets the drain flag for the queue.
926

927
    This is similar to the function L{backend.JobQueueSetDrainFlag},
928
    and in the future we might merge them.
929

930
    @type drain_flag: boolean
931
    @param drain_flag: wheter to set or unset the drain flag
932

933
    """
934
    if drain_flag:
935
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
936
    else:
937
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
938
    return True
939

    
940
  @utils.LockedMethod
941
  @_RequireOpenQueue
942
  def SubmitJob(self, ops):
943
    """Create and store a new job.
944

945
    This enters the job into our job queue and also puts it on the new
946
    queue, in order for it to be picked up by the queue processors.
947

948
    @type ops: list
949
    @param ops: The list of OpCodes that will become the new job.
950
    @rtype: job ID
951
    @return: the job ID of the newly created job
952
    @raise errors.JobQueueDrainError: if the job is marked for draining
953

954
    """
955
    if self._IsQueueMarkedDrain():
956
      raise errors.JobQueueDrainError()
957

    
958
    # Check job queue size
959
    size = len(self._ListJobFiles())
960
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
961
      # TODO: Autoarchive jobs. Make sure it's not done on every job
962
      # submission, though.
963
      #size = ...
964
      pass
965

    
966
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
967
      raise errors.JobQueueFull()
968

    
969
    # Get job identifier
970
    job_id = self._NewSerialUnlocked()
971
    job = _QueuedJob(self, job_id, ops)
972

    
973
    # Write to disk
974
    self.UpdateJobUnlocked(job)
975

    
976
    logging.debug("Adding new job %s to the cache", job_id)
977
    self._memcache[job_id] = job
978

    
979
    # Add to worker pool
980
    self._wpool.AddTask(job)
981

    
982
    return job.id
983

    
984
  @_RequireOpenQueue
985
  def UpdateJobUnlocked(self, job):
986
    """Update a job's on disk storage.
987

988
    After a job has been modified, this function needs to be called in
989
    order to write the changes to disk and replicate them to the other
990
    nodes.
991

992
    @type job: L{_QueuedJob}
993
    @param job: the changed job
994

995
    """
996
    filename = self._GetJobPath(job.id)
997
    data = serializer.DumpJson(job.Serialize(), indent=False)
998
    logging.debug("Writing job %s to %s", job.id, filename)
999
    self._WriteAndReplicateFileUnlocked(filename, data)
1000

    
1001
    # Notify waiters about potential changes
1002
    job.change.notifyAll()
1003

    
1004
  @utils.LockedMethod
1005
  @_RequireOpenQueue
1006
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1007
                        timeout):
1008
    """Waits for changes in a job.
1009

1010
    @type job_id: string
1011
    @param job_id: Job identifier
1012
    @type fields: list of strings
1013
    @param fields: Which fields to check for changes
1014
    @type prev_job_info: list or None
1015
    @param prev_job_info: Last job information returned
1016
    @type prev_log_serial: int
1017
    @param prev_log_serial: Last job message serial number
1018
    @type timeout: float
1019
    @param timeout: maximum time to wait
1020
    @rtype: tuple (job info, log entries)
1021
    @return: a tuple of the job information as required via
1022
        the fields parameter, and the log entries as a list
1023

1024
        if the job has not changed and the timeout has expired,
1025
        we instead return a special value,
1026
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1027
        as such by the clients
1028

1029
    """
1030
    logging.debug("Waiting for changes in job %s", job_id)
1031
    end_time = time.time() + timeout
1032
    while True:
1033
      delta_time = end_time - time.time()
1034
      if delta_time < 0:
1035
        return constants.JOB_NOTCHANGED
1036

    
1037
      job = self._LoadJobUnlocked(job_id)
1038
      if not job:
1039
        logging.debug("Job %s not found", job_id)
1040
        break
1041

    
1042
      status = job.CalcStatus()
1043
      job_info = self._GetJobInfoUnlocked(job, fields)
1044
      log_entries = job.GetLogEntries(prev_log_serial)
1045

    
1046
      # Serializing and deserializing data can cause type changes (e.g. from
1047
      # tuple to list) or precision loss. We're doing it here so that we get
1048
      # the same modifications as the data received from the client. Without
1049
      # this, the comparison afterwards might fail without the data being
1050
      # significantly different.
1051
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1052
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1053

    
1054
      if status not in (constants.JOB_STATUS_QUEUED,
1055
                        constants.JOB_STATUS_RUNNING,
1056
                        constants.JOB_STATUS_WAITLOCK):
1057
        # Don't even try to wait if the job is no longer running, there will be
1058
        # no changes.
1059
        break
1060

    
1061
      if (prev_job_info != job_info or
1062
          (log_entries and prev_log_serial != log_entries[0][0])):
1063
        break
1064

    
1065
      logging.debug("Waiting again")
1066

    
1067
      # Release the queue lock while waiting
1068
      job.change.wait(delta_time)
1069

    
1070
    logging.debug("Job %s changed", job_id)
1071

    
1072
    return (job_info, log_entries)
1073

    
1074
  @utils.LockedMethod
1075
  @_RequireOpenQueue
1076
  def CancelJob(self, job_id):
1077
    """Cancels a job.
1078

1079
    This will only succeed if the job has not started yet.
1080

1081
    @type job_id: string
1082
    @param job_id: job ID of job to be cancelled.
1083

1084
    """
1085
    logging.info("Cancelling job %s", job_id)
1086

    
1087
    job = self._LoadJobUnlocked(job_id)
1088
    if not job:
1089
      logging.debug("Job %s not found", job_id)
1090
      return (False, "Job %s not found" % job_id)
1091

    
1092
    job_status = job.CalcStatus()
1093

    
1094
    if job_status not in (constants.JOB_STATUS_QUEUED,
1095
                          constants.JOB_STATUS_WAITLOCK):
1096
      logging.debug("Job %s is no longer in the queue", job.id)
1097
      return (False, "Job %s is no longer in the queue" % job.id)
1098

    
1099
    if job_status == constants.JOB_STATUS_QUEUED:
1100
      self.CancelJobUnlocked(job)
1101
      return (True, "Job %s canceled" % job.id)
1102

    
1103
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1104
      # The worker will notice the new status and cancel the job
1105
      try:
1106
        for op in job.ops:
1107
          op.status = constants.OP_STATUS_CANCELING
1108
      finally:
1109
        self.UpdateJobUnlocked(job)
1110
      return (True, "Job %s will be canceled" % job.id)
1111

    
1112
  @_RequireOpenQueue
1113
  def CancelJobUnlocked(self, job):
1114
    """Marks a job as canceled.
1115

1116
    """
1117
    try:
1118
      for op in job.ops:
1119
        op.status = constants.OP_STATUS_ERROR
1120
        op.result = "Job canceled by request"
1121
    finally:
1122
      self.UpdateJobUnlocked(job)
1123

    
1124
  @_RequireOpenQueue
1125
  def _ArchiveJobsUnlocked(self, jobs):
1126
    """Archives jobs.
1127

1128
    @type jobs: list of L{_QueuedJob}
1129
    @param job: Job objects
1130
    @rtype: int
1131
    @return: Number of archived jobs
1132

1133
    """
1134
    archive_jobs = []
1135
    rename_files = []
1136
    for job in jobs:
1137
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1138
                                  constants.JOB_STATUS_SUCCESS,
1139
                                  constants.JOB_STATUS_ERROR):
1140
        logging.debug("Job %s is not yet done", job.id)
1141
        continue
1142

    
1143
      archive_jobs.append(job)
1144

    
1145
      old = self._GetJobPath(job.id)
1146
      new = self._GetArchivedJobPath(job.id)
1147
      rename_files.append((old, new))
1148

    
1149
    # TODO: What if 1..n files fail to rename?
1150
    self._RenameFilesUnlocked(rename_files)
1151

    
1152
    logging.debug("Successfully archived job(s) %s",
1153
                  ", ".join(job.id for job in archive_jobs))
1154

    
1155
    return len(archive_jobs)
1156

    
1157
  @utils.LockedMethod
1158
  @_RequireOpenQueue
1159
  def ArchiveJob(self, job_id):
1160
    """Archives a job.
1161

1162
    This is just a wrapper over L{_ArchiveJobUnlocked}.
1163

1164
    @type job_id: string
1165
    @param job_id: Job ID of job to be archived.
1166
    @rtype: bool
1167
    @return: Whether job was archived
1168

1169
    """
1170
    logging.info("Archiving job %s", job_id)
1171

    
1172
    job = self._LoadJobUnlocked(job_id)
1173
    if not job:
1174
      logging.debug("Job %s not found", job_id)
1175
      return False
1176

    
1177
    return self._ArchiveJobUnlocked([job]) == 1
1178

    
1179
  @utils.LockedMethod
1180
  @_RequireOpenQueue
1181
  def AutoArchiveJobs(self, age, timeout):
1182
    """Archives all jobs based on age.
1183

1184
    The method will archive all jobs which are older than the age
1185
    parameter. For jobs that don't have an end timestamp, the start
1186
    timestamp will be considered. The special '-1' age will cause
1187
    archival of all jobs (that are not running or queued).
1188

1189
    @type age: int
1190
    @param age: the minimum age in seconds
1191

1192
    """
1193
    logging.info("Archiving jobs with age more than %s seconds", age)
1194

    
1195
    now = time.time()
1196
    end_time = now + timeout
1197
    archived_count = 0
1198
    last_touched = 0
1199

    
1200
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1201
    pending = []
1202
    for idx, job_id in enumerate(all_job_ids):
1203
      last_touched = idx
1204

    
1205
      # Not optimal because jobs could be pending
1206
      # TODO: Measure average duration for job archival and take number of
1207
      # pending jobs into account.
1208
      if time.time() > end_time:
1209
        break
1210

    
1211
      # Returns None if the job failed to load
1212
      job = self._LoadJobUnlocked(job_id)
1213
      if job:
1214
        if job.end_timestamp is None:
1215
          if job.start_timestamp is None:
1216
            job_age = job.received_timestamp
1217
          else:
1218
            job_age = job.start_timestamp
1219
        else:
1220
          job_age = job.end_timestamp
1221

    
1222
        if age == -1 or now - job_age[0] > age:
1223
          pending.append(job)
1224

    
1225
          # Archive 10 jobs at a time
1226
          if len(pending) >= 10:
1227
            archived_count += self._ArchiveJobsUnlocked(pending)
1228
            pending = []
1229

    
1230
    if pending:
1231
      archived_count += self._ArchiveJobsUnlocked(pending)
1232

    
1233
    return (archived_count, len(all_job_ids) - last_touched - 1)
1234

    
1235
  def _GetJobInfoUnlocked(self, job, fields):
1236
    """Returns information about a job.
1237

1238
    @type job: L{_QueuedJob}
1239
    @param job: the job which we query
1240
    @type fields: list
1241
    @param fields: names of fields to return
1242
    @rtype: list
1243
    @return: list with one element for each field
1244
    @raise errors.OpExecError: when an invalid field
1245
        has been passed
1246

1247
    """
1248
    row = []
1249
    for fname in fields:
1250
      if fname == "id":
1251
        row.append(job.id)
1252
      elif fname == "status":
1253
        row.append(job.CalcStatus())
1254
      elif fname == "ops":
1255
        row.append([op.input.__getstate__() for op in job.ops])
1256
      elif fname == "opresult":
1257
        row.append([op.result for op in job.ops])
1258
      elif fname == "opstatus":
1259
        row.append([op.status for op in job.ops])
1260
      elif fname == "oplog":
1261
        row.append([op.log for op in job.ops])
1262
      elif fname == "opstart":
1263
        row.append([op.start_timestamp for op in job.ops])
1264
      elif fname == "opend":
1265
        row.append([op.end_timestamp for op in job.ops])
1266
      elif fname == "received_ts":
1267
        row.append(job.received_timestamp)
1268
      elif fname == "start_ts":
1269
        row.append(job.start_timestamp)
1270
      elif fname == "end_ts":
1271
        row.append(job.end_timestamp)
1272
      elif fname == "summary":
1273
        row.append([op.input.Summary() for op in job.ops])
1274
      else:
1275
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1276
    return row
1277

    
1278
  @utils.LockedMethod
1279
  @_RequireOpenQueue
1280
  def QueryJobs(self, job_ids, fields):
1281
    """Returns a list of jobs in queue.
1282

1283
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1284
    processing for each job.
1285

1286
    @type job_ids: list
1287
    @param job_ids: sequence of job identifiers or None for all
1288
    @type fields: list
1289
    @param fields: names of fields to return
1290
    @rtype: list
1291
    @return: list one element per job, each element being list with
1292
        the requested fields
1293

1294
    """
1295
    jobs = []
1296

    
1297
    for job in self._GetJobsUnlocked(job_ids):
1298
      if job is None:
1299
        jobs.append(None)
1300
      else:
1301
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1302

    
1303
    return jobs
1304

    
1305
  @utils.LockedMethod
1306
  @_RequireOpenQueue
1307
  def Shutdown(self):
1308
    """Stops the job queue.
1309

1310
    This shutdowns all the worker threads an closes the queue.
1311

1312
    """
1313
    self._wpool.TerminateWorkers()
1314

    
1315
    self._queue_lock.Close()
1316
    self._queue_lock = None