Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ dd875d32

History | View | Annotate | Download (37.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encasulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  def __init__(self, op):
84
    """Constructor for the _QuededOpCode.
85

86
    @type op: L{opcodes.OpCode}
87
    @param op: the opcode we encapsulate
88

89
    """
90
    self.input = op
91
    self.status = constants.OP_STATUS_QUEUED
92
    self.result = None
93
    self.log = []
94
    self.start_timestamp = None
95
    self.end_timestamp = None
96

    
97
  @classmethod
98
  def Restore(cls, state):
99
    """Restore the _QueuedOpCode from the serialized form.
100

101
    @type state: dict
102
    @param state: the serialized state
103
    @rtype: _QueuedOpCode
104
    @return: a new _QueuedOpCode instance
105

106
    """
107
    obj = _QueuedOpCode.__new__(cls)
108
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109
    obj.status = state["status"]
110
    obj.result = state["result"]
111
    obj.log = state["log"]
112
    obj.start_timestamp = state.get("start_timestamp", None)
113
    obj.end_timestamp = state.get("end_timestamp", None)
114
    return obj
115

    
116
  def Serialize(self):
117
    """Serializes this _QueuedOpCode.
118

119
    @rtype: dict
120
    @return: the dictionary holding the serialized state
121

122
    """
123
    return {
124
      "input": self.input.__getstate__(),
125
      "status": self.status,
126
      "result": self.result,
127
      "log": self.log,
128
      "start_timestamp": self.start_timestamp,
129
      "end_timestamp": self.end_timestamp,
130
      }
131

    
132

    
133
class _QueuedJob(object):
134
  """In-memory job representation.
135

136
  This is what we use to track the user-submitted jobs. Locking must
137
  be taken care of by users of this class.
138

139
  @type queue: L{JobQueue}
140
  @ivar queue: the parent queue
141
  @ivar id: the job ID
142
  @type ops: list
143
  @ivar ops: the list of _QueuedOpCode that constitute the job
144
  @type run_op_index: int
145
  @ivar run_op_index: the currently executing opcode, or -1 if
146
      we didn't yet start executing
147
  @type log_serial: int
148
  @ivar log_serial: holds the index for the next log entry
149
  @ivar received_timestamp: the timestamp for when the job was received
150
  @ivar start_timestmap: the timestamp for start of execution
151
  @ivar end_timestamp: the timestamp for end of execution
152
  @ivar change: a Condition variable we use for waiting for job changes
153

154
  """
155
  def __init__(self, queue, job_id, ops):
156
    """Constructor for the _QueuedJob.
157

158
    @type queue: L{JobQueue}
159
    @param queue: our parent queue
160
    @type job_id: job_id
161
    @param job_id: our job id
162
    @type ops: list
163
    @param ops: the list of opcodes we hold, which will be encapsulated
164
        in _QueuedOpCodes
165

166
    """
167
    if not ops:
168
      # TODO: use a better exception
169
      raise Exception("No opcodes")
170

    
171
    self.queue = queue
172
    self.id = job_id
173
    self.ops = [_QueuedOpCode(op) for op in ops]
174
    self.run_op_index = -1
175
    self.log_serial = 0
176
    self.received_timestamp = TimeStampNow()
177
    self.start_timestamp = None
178
    self.end_timestamp = None
179

    
180
    # Condition to wait for changes
181
    self.change = threading.Condition(self.queue._lock)
182

    
183
  @classmethod
184
  def Restore(cls, queue, state):
185
    """Restore a _QueuedJob from serialized state:
186

187
    @type queue: L{JobQueue}
188
    @param queue: to which queue the restored job belongs
189
    @type state: dict
190
    @param state: the serialized state
191
    @rtype: _JobQueue
192
    @return: the restored _JobQueue instance
193

194
    """
195
    obj = _QueuedJob.__new__(cls)
196
    obj.queue = queue
197
    obj.id = state["id"]
198
    obj.run_op_index = state["run_op_index"]
199
    obj.received_timestamp = state.get("received_timestamp", None)
200
    obj.start_timestamp = state.get("start_timestamp", None)
201
    obj.end_timestamp = state.get("end_timestamp", None)
202

    
203
    obj.ops = []
204
    obj.log_serial = 0
205
    for op_state in state["ops"]:
206
      op = _QueuedOpCode.Restore(op_state)
207
      for log_entry in op.log:
208
        obj.log_serial = max(obj.log_serial, log_entry[0])
209
      obj.ops.append(op)
210

    
211
    # Condition to wait for changes
212
    obj.change = threading.Condition(obj.queue._lock)
213

    
214
    return obj
215

    
216
  def Serialize(self):
217
    """Serialize the _JobQueue instance.
218

219
    @rtype: dict
220
    @return: the serialized state
221

222
    """
223
    return {
224
      "id": self.id,
225
      "ops": [op.Serialize() for op in self.ops],
226
      "run_op_index": self.run_op_index,
227
      "start_timestamp": self.start_timestamp,
228
      "end_timestamp": self.end_timestamp,
229
      "received_timestamp": self.received_timestamp,
230
      }
231

    
232
  def CalcStatus(self):
233
    """Compute the status of this job.
234

235
    This function iterates over all the _QueuedOpCodes in the job and
236
    based on their status, computes the job status.
237

238
    The algorithm is:
239
      - if we find a cancelled, or finished with error, the job
240
        status will be the same
241
      - otherwise, the last opcode with the status one of:
242
          - waitlock
243
          - canceling
244
          - running
245

246
        will determine the job status
247

248
      - otherwise, it means either all opcodes are queued, or success,
249
        and the job status will be the same
250

251
    @return: the job status
252

253
    """
254
    status = constants.JOB_STATUS_QUEUED
255

    
256
    all_success = True
257
    for op in self.ops:
258
      if op.status == constants.OP_STATUS_SUCCESS:
259
        continue
260

    
261
      all_success = False
262

    
263
      if op.status == constants.OP_STATUS_QUEUED:
264
        pass
265
      elif op.status == constants.OP_STATUS_WAITLOCK:
266
        status = constants.JOB_STATUS_WAITLOCK
267
      elif op.status == constants.OP_STATUS_RUNNING:
268
        status = constants.JOB_STATUS_RUNNING
269
      elif op.status == constants.OP_STATUS_CANCELING:
270
        status = constants.JOB_STATUS_CANCELING
271
        break
272
      elif op.status == constants.OP_STATUS_ERROR:
273
        status = constants.JOB_STATUS_ERROR
274
        # The whole job fails if one opcode failed
275
        break
276
      elif op.status == constants.OP_STATUS_CANCELED:
277
        status = constants.OP_STATUS_CANCELED
278
        break
279

    
280
    if all_success:
281
      status = constants.JOB_STATUS_SUCCESS
282

    
283
    return status
284

    
285
  def GetLogEntries(self, newer_than):
286
    """Selectively returns the log entries.
287

288
    @type newer_than: None or int
289
    @param newer_than: if this is None, return all log enties,
290
        otherwise return only the log entries with serial higher
291
        than this value
292
    @rtype: list
293
    @return: the list of the log entries selected
294

295
    """
296
    if newer_than is None:
297
      serial = -1
298
    else:
299
      serial = newer_than
300

    
301
    entries = []
302
    for op in self.ops:
303
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
304

    
305
    return entries
306

    
307

    
308
class _JobQueueWorker(workerpool.BaseWorker):
309
  """The actual job workers.
310

311
  """
312
  def _NotifyStart(self):
313
    """Mark the opcode as running, not lock-waiting.
314

315
    This is called from the mcpu code as a notifier function, when the
316
    LU is finally about to start the Exec() method. Of course, to have
317
    end-user visible results, the opcode must be initially (before
318
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
319

320
    """
321
    assert self.queue, "Queue attribute is missing"
322
    assert self.opcode, "Opcode attribute is missing"
323

    
324
    self.queue.acquire()
325
    try:
326
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327
                                    constants.OP_STATUS_CANCELING)
328

    
329
      # Cancel here if we were asked to
330
      if self.opcode.status == constants.OP_STATUS_CANCELING:
331
        raise CancelJob()
332

    
333
      self.opcode.status = constants.OP_STATUS_RUNNING
334
    finally:
335
      self.queue.release()
336

    
337
  def RunTask(self, job):
338
    """Job executor.
339

340
    This functions processes a job. It is closely tied to the _QueuedJob and
341
    _QueuedOpCode classes.
342

343
    @type job: L{_QueuedJob}
344
    @param job: the job to be processed
345

346
    """
347
    logging.debug("Worker %s processing job %s",
348
                  self.worker_id, job.id)
349
    proc = mcpu.Processor(self.pool.queue.context)
350
    self.queue = queue = job.queue
351
    try:
352
      try:
353
        count = len(job.ops)
354
        for idx, op in enumerate(job.ops):
355
          try:
356
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
357

    
358
            queue.acquire()
359
            try:
360
              assert op.status == constants.OP_STATUS_QUEUED
361
              job.run_op_index = idx
362
              op.status = constants.OP_STATUS_WAITLOCK
363
              op.result = None
364
              op.start_timestamp = TimeStampNow()
365
              if idx == 0: # first opcode
366
                job.start_timestamp = op.start_timestamp
367
              queue.UpdateJobUnlocked(job)
368

    
369
              input_opcode = op.input
370
            finally:
371
              queue.release()
372

    
373
            def _Log(*args):
374
              """Append a log entry.
375

376
              """
377
              assert len(args) < 3
378

    
379
              if len(args) == 1:
380
                log_type = constants.ELOG_MESSAGE
381
                log_msg = args[0]
382
              else:
383
                log_type, log_msg = args
384

    
385
              # The time is split to make serialization easier and not lose
386
              # precision.
387
              timestamp = utils.SplitTime(time.time())
388

    
389
              queue.acquire()
390
              try:
391
                job.log_serial += 1
392
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
393

    
394
                job.change.notifyAll()
395
              finally:
396
                queue.release()
397

    
398
            # Make sure not to hold lock while _Log is called
399
            self.opcode = op
400
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
401

    
402
            queue.acquire()
403
            try:
404
              op.status = constants.OP_STATUS_SUCCESS
405
              op.result = result
406
              op.end_timestamp = TimeStampNow()
407
              queue.UpdateJobUnlocked(job)
408
            finally:
409
              queue.release()
410

    
411
            logging.debug("Op %s/%s: Successfully finished %s",
412
                          idx + 1, count, op)
413
          except CancelJob:
414
            # Will be handled further up
415
            raise
416
          except Exception, err:
417
            queue.acquire()
418
            try:
419
              try:
420
                op.status = constants.OP_STATUS_ERROR
421
                op.result = str(err)
422
                op.end_timestamp = TimeStampNow()
423
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
424
              finally:
425
                queue.UpdateJobUnlocked(job)
426
            finally:
427
              queue.release()
428
            raise
429

    
430
      except CancelJob:
431
        queue.acquire()
432
        try:
433
          queue.CancelJobUnlocked(job)
434
        finally:
435
          queue.release()
436
      except errors.GenericError, err:
437
        logging.exception("Ganeti exception")
438
      except:
439
        logging.exception("Unhandled exception")
440
    finally:
441
      queue.acquire()
442
      try:
443
        try:
444
          job.run_op_idx = -1
445
          job.end_timestamp = TimeStampNow()
446
          queue.UpdateJobUnlocked(job)
447
        finally:
448
          job_id = job.id
449
          status = job.CalcStatus()
450
      finally:
451
        queue.release()
452
      logging.debug("Worker %s finished job %s, status = %s",
453
                    self.worker_id, job_id, status)
454

    
455

    
456
class _JobQueueWorkerPool(workerpool.WorkerPool):
457
  """Simple class implementing a job-processing workerpool.
458

459
  """
460
  def __init__(self, queue):
461
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
462
                                              _JobQueueWorker)
463
    self.queue = queue
464

    
465

    
466
class JobQueue(object):
467
  """Quue used to manaage the jobs.
468

469
  @cvar _RE_JOB_FILE: regex matching the valid job file names
470

471
  """
472
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
473

    
474
  def _RequireOpenQueue(fn):
475
    """Decorator for "public" functions.
476

477
    This function should be used for all 'public' functions. That is,
478
    functions usually called from other classes.
479

480
    @warning: Use this decorator only after utils.LockedMethod!
481

482
    Example::
483
      @utils.LockedMethod
484
      @_RequireOpenQueue
485
      def Example(self):
486
        pass
487

488
    """
489
    def wrapper(self, *args, **kwargs):
490
      assert self._queue_lock is not None, "Queue should be open"
491
      return fn(self, *args, **kwargs)
492
    return wrapper
493

    
494
  def __init__(self, context):
495
    """Constructor for JobQueue.
496

497
    The constructor will initialize the job queue object and then
498
    start loading the current jobs from disk, either for starting them
499
    (if they were queue) or for aborting them (if they were already
500
    running).
501

502
    @type context: GanetiContext
503
    @param context: the context object for access to the configuration
504
        data and other ganeti objects
505

506
    """
507
    self.context = context
508
    self._memcache = weakref.WeakValueDictionary()
509
    self._my_hostname = utils.HostInfo().name
510

    
511
    # Locking
512
    self._lock = threading.Lock()
513
    self.acquire = self._lock.acquire
514
    self.release = self._lock.release
515

    
516
    # Initialize
517
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
518

    
519
    # Read serial file
520
    self._last_serial = jstore.ReadSerial()
521
    assert self._last_serial is not None, ("Serial file was modified between"
522
                                           " check in jstore and here")
523

    
524
    # Get initial list of nodes
525
    self._nodes = dict((n.name, n.primary_ip)
526
                       for n in self.context.cfg.GetAllNodesInfo().values()
527
                       if n.master_candidate)
528

    
529
    # Remove master node
530
    try:
531
      del self._nodes[self._my_hostname]
532
    except KeyError:
533
      pass
534

    
535
    # TODO: Check consistency across nodes
536

    
537
    # Setup worker pool
538
    self._wpool = _JobQueueWorkerPool(self)
539
    try:
540
      # We need to lock here because WorkerPool.AddTask() may start a job while
541
      # we're still doing our work.
542
      self.acquire()
543
      try:
544
        logging.info("Inspecting job queue")
545

    
546
        all_job_ids = self._GetJobIDsUnlocked()
547
        jobs_count = len(all_job_ids)
548
        lastinfo = time.time()
549
        for idx, job_id in enumerate(all_job_ids):
550
          # Give an update every 1000 jobs or 10 seconds
551
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
552
              idx == (jobs_count - 1)):
553
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
554
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
555
            lastinfo = time.time()
556

    
557
          job = self._LoadJobUnlocked(job_id)
558

    
559
          # a failure in loading the job can cause 'None' to be returned
560
          if job is None:
561
            continue
562

    
563
          status = job.CalcStatus()
564

    
565
          if status in (constants.JOB_STATUS_QUEUED, ):
566
            self._wpool.AddTask(job)
567

    
568
          elif status in (constants.JOB_STATUS_RUNNING,
569
                          constants.JOB_STATUS_WAITLOCK,
570
                          constants.JOB_STATUS_CANCELING):
571
            logging.warning("Unfinished job %s found: %s", job.id, job)
572
            try:
573
              for op in job.ops:
574
                op.status = constants.OP_STATUS_ERROR
575
                op.result = "Unclean master daemon shutdown"
576
            finally:
577
              self.UpdateJobUnlocked(job)
578

    
579
        logging.info("Job queue inspection finished")
580
      finally:
581
        self.release()
582
    except:
583
      self._wpool.TerminateWorkers()
584
      raise
585

    
586
  @utils.LockedMethod
587
  @_RequireOpenQueue
588
  def AddNode(self, node):
589
    """Register a new node with the queue.
590

591
    @type node: L{objects.Node}
592
    @param node: the node object to be added
593

594
    """
595
    node_name = node.name
596
    assert node_name != self._my_hostname
597

    
598
    # Clean queue directory on added node
599
    rpc.RpcRunner.call_jobqueue_purge(node_name)
600

    
601
    if not node.master_candidate:
602
      # remove if existing, ignoring errors
603
      self._nodes.pop(node_name, None)
604
      # and skip the replication of the job ids
605
      return
606

    
607
    # Upload the whole queue excluding archived jobs
608
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
609

    
610
    # Upload current serial file
611
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
612

    
613
    for file_name in files:
614
      # Read file content
615
      fd = open(file_name, "r")
616
      try:
617
        content = fd.read()
618
      finally:
619
        fd.close()
620

    
621
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
622
                                                  [node.primary_ip],
623
                                                  file_name, content)
624
      if not result[node_name]:
625
        logging.error("Failed to upload %s to %s", file_name, node_name)
626

    
627
    self._nodes[node_name] = node.primary_ip
628

    
629
  @utils.LockedMethod
630
  @_RequireOpenQueue
631
  def RemoveNode(self, node_name):
632
    """Callback called when removing nodes from the cluster.
633

634
    @type node_name: str
635
    @param node_name: the name of the node to remove
636

637
    """
638
    try:
639
      # The queue is removed by the "leave node" RPC call.
640
      del self._nodes[node_name]
641
    except KeyError:
642
      pass
643

    
644
  def _CheckRpcResult(self, result, nodes, failmsg):
645
    """Verifies the status of an RPC call.
646

647
    Since we aim to keep consistency should this node (the current
648
    master) fail, we will log errors if our rpc fail, and especially
649
    log the case when more than half of the nodes failes.
650

651
    @param result: the data as returned from the rpc call
652
    @type nodes: list
653
    @param nodes: the list of nodes we made the call to
654
    @type failmsg: str
655
    @param failmsg: the identifier to be used for logging
656

657
    """
658
    failed = []
659
    success = []
660

    
661
    for node in nodes:
662
      if result[node]:
663
        success.append(node)
664
      else:
665
        failed.append(node)
666

    
667
    if failed:
668
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
669

    
670
    # +1 for the master node
671
    if (len(success) + 1) < len(failed):
672
      # TODO: Handle failing nodes
673
      logging.error("More than half of the nodes failed")
674

    
675
  def _GetNodeIp(self):
676
    """Helper for returning the node name/ip list.
677

678
    @rtype: (list, list)
679
    @return: a tuple of two lists, the first one with the node
680
        names and the second one with the node addresses
681

682
    """
683
    name_list = self._nodes.keys()
684
    addr_list = [self._nodes[name] for name in name_list]
685
    return name_list, addr_list
686

    
687
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
688
    """Writes a file locally and then replicates it to all nodes.
689

690
    This function will replace the contents of a file on the local
691
    node and then replicate it to all the other nodes we have.
692

693
    @type file_name: str
694
    @param file_name: the path of the file to be replicated
695
    @type data: str
696
    @param data: the new contents of the file
697

698
    """
699
    utils.WriteFile(file_name, data=data)
700

    
701
    names, addrs = self._GetNodeIp()
702
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
703
    self._CheckRpcResult(result, self._nodes,
704
                         "Updating %s" % file_name)
705

    
706
  def _RenameFilesUnlocked(self, rename):
707
    """Renames a file locally and then replicate the change.
708

709
    This function will rename a file in the local queue directory
710
    and then replicate this rename to all the other nodes we have.
711

712
    @type rename: list of (old, new)
713
    @param rename: List containing tuples mapping old to new names
714

715
    """
716
    # Rename them locally
717
    for old, new in rename:
718
      utils.RenameFile(old, new, mkdir=True)
719

    
720
    # ... and on all nodes
721
    names, addrs = self._GetNodeIp()
722
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
723
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
724

    
725
  def _FormatJobID(self, job_id):
726
    """Convert a job ID to string format.
727

728
    Currently this just does C{str(job_id)} after performing some
729
    checks, but if we want to change the job id format this will
730
    abstract this change.
731

732
    @type job_id: int or long
733
    @param job_id: the numeric job id
734
    @rtype: str
735
    @return: the formatted job id
736

737
    """
738
    if not isinstance(job_id, (int, long)):
739
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
740
    if job_id < 0:
741
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
742

    
743
    return str(job_id)
744

    
745
  @classmethod
746
  def _GetArchiveDirectory(cls, job_id):
747
    """Returns the archive directory for a job.
748

749
    @type job_id: str
750
    @param job_id: Job identifier
751
    @rtype: str
752
    @return: Directory name
753

754
    """
755
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
756

    
757
  def _NewSerialUnlocked(self):
758
    """Generates a new job identifier.
759

760
    Job identifiers are unique during the lifetime of a cluster.
761

762
    @rtype: str
763
    @return: a string representing the job identifier.
764

765
    """
766
    # New number
767
    serial = self._last_serial + 1
768

    
769
    # Write to file
770
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
771
                                        "%s\n" % serial)
772

    
773
    # Keep it only if we were able to write the file
774
    self._last_serial = serial
775

    
776
    return self._FormatJobID(serial)
777

    
778
  @staticmethod
779
  def _GetJobPath(job_id):
780
    """Returns the job file for a given job id.
781

782
    @type job_id: str
783
    @param job_id: the job identifier
784
    @rtype: str
785
    @return: the path to the job file
786

787
    """
788
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
789

    
790
  @classmethod
791
  def _GetArchivedJobPath(cls, job_id):
792
    """Returns the archived job file for a give job id.
793

794
    @type job_id: str
795
    @param job_id: the job identifier
796
    @rtype: str
797
    @return: the path to the archived job file
798

799
    """
800
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
801
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
802

    
803
  @classmethod
804
  def _ExtractJobID(cls, name):
805
    """Extract the job id from a filename.
806

807
    @type name: str
808
    @param name: the job filename
809
    @rtype: job id or None
810
    @return: the job id corresponding to the given filename,
811
        or None if the filename does not represent a valid
812
        job file
813

814
    """
815
    m = cls._RE_JOB_FILE.match(name)
816
    if m:
817
      return m.group(1)
818
    else:
819
      return None
820

    
821
  def _GetJobIDsUnlocked(self, archived=False):
822
    """Return all known job IDs.
823

824
    If the parameter archived is True, archived jobs IDs will be
825
    included. Currently this argument is unused.
826

827
    The method only looks at disk because it's a requirement that all
828
    jobs are present on disk (so in the _memcache we don't have any
829
    extra IDs).
830

831
    @rtype: list
832
    @return: the list of job IDs
833

834
    """
835
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
836
    jlist = utils.NiceSort(jlist)
837
    return jlist
838

    
839
  def _ListJobFiles(self):
840
    """Returns the list of current job files.
841

842
    @rtype: list
843
    @return: the list of job file names
844

845
    """
846
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
847
            if self._RE_JOB_FILE.match(name)]
848

    
849
  def _LoadJobUnlocked(self, job_id):
850
    """Loads a job from the disk or memory.
851

852
    Given a job id, this will return the cached job object if
853
    existing, or try to load the job from the disk. If loading from
854
    disk, it will also add the job to the cache.
855

856
    @param job_id: the job id
857
    @rtype: L{_QueuedJob} or None
858
    @return: either None or the job object
859

860
    """
861
    job = self._memcache.get(job_id, None)
862
    if job:
863
      logging.debug("Found job %s in memcache", job_id)
864
      return job
865

    
866
    filepath = self._GetJobPath(job_id)
867
    logging.debug("Loading job from %s", filepath)
868
    try:
869
      fd = open(filepath, "r")
870
    except IOError, err:
871
      if err.errno in (errno.ENOENT, ):
872
        return None
873
      raise
874
    try:
875
      data = serializer.LoadJson(fd.read())
876
    finally:
877
      fd.close()
878

    
879
    try:
880
      job = _QueuedJob.Restore(self, data)
881
    except Exception, err:
882
      new_path = self._GetArchivedJobPath(job_id)
883
      if filepath == new_path:
884
        # job already archived (future case)
885
        logging.exception("Can't parse job %s", job_id)
886
      else:
887
        # non-archived case
888
        logging.exception("Can't parse job %s, will archive.", job_id)
889
        self._RenameFilesUnlocked([(filepath, new_path)])
890
      return None
891

    
892
    self._memcache[job_id] = job
893
    logging.debug("Added job %s to the cache", job_id)
894
    return job
895

    
896
  def _GetJobsUnlocked(self, job_ids):
897
    """Return a list of jobs based on their IDs.
898

899
    @type job_ids: list
900
    @param job_ids: either an empty list (meaning all jobs),
901
        or a list of job IDs
902
    @rtype: list
903
    @return: the list of job objects
904

905
    """
906
    if not job_ids:
907
      job_ids = self._GetJobIDsUnlocked()
908

    
909
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
910

    
911
  @staticmethod
912
  def _IsQueueMarkedDrain():
913
    """Check if the queue is marked from drain.
914

915
    This currently uses the queue drain file, which makes it a
916
    per-node flag. In the future this can be moved to the config file.
917

918
    @rtype: boolean
919
    @return: True of the job queue is marked for draining
920

921
    """
922
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
923

    
924
  @staticmethod
925
  def SetDrainFlag(drain_flag):
926
    """Sets the drain flag for the queue.
927

928
    This is similar to the function L{backend.JobQueueSetDrainFlag},
929
    and in the future we might merge them.
930

931
    @type drain_flag: boolean
932
    @param drain_flag: wheter to set or unset the drain flag
933

934
    """
935
    if drain_flag:
936
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
937
    else:
938
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
939
    return True
940

    
941
  @utils.LockedMethod
942
  @_RequireOpenQueue
943
  def SubmitJob(self, ops):
944
    """Create and store a new job.
945

946
    This enters the job into our job queue and also puts it on the new
947
    queue, in order for it to be picked up by the queue processors.
948

949
    @type ops: list
950
    @param ops: The list of OpCodes that will become the new job.
951
    @rtype: job ID
952
    @return: the job ID of the newly created job
953
    @raise errors.JobQueueDrainError: if the job is marked for draining
954

955
    """
956
    if self._IsQueueMarkedDrain():
957
      raise errors.JobQueueDrainError()
958

    
959
    # Check job queue size
960
    size = len(self._ListJobFiles())
961
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
962
      # TODO: Autoarchive jobs. Make sure it's not done on every job
963
      # submission, though.
964
      #size = ...
965
      pass
966

    
967
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
968
      raise errors.JobQueueFull()
969

    
970
    # Get job identifier
971
    job_id = self._NewSerialUnlocked()
972
    job = _QueuedJob(self, job_id, ops)
973

    
974
    # Write to disk
975
    self.UpdateJobUnlocked(job)
976

    
977
    logging.debug("Adding new job %s to the cache", job_id)
978
    self._memcache[job_id] = job
979

    
980
    # Add to worker pool
981
    self._wpool.AddTask(job)
982

    
983
    return job.id
984

    
985
  @_RequireOpenQueue
986
  def UpdateJobUnlocked(self, job):
987
    """Update a job's on disk storage.
988

989
    After a job has been modified, this function needs to be called in
990
    order to write the changes to disk and replicate them to the other
991
    nodes.
992

993
    @type job: L{_QueuedJob}
994
    @param job: the changed job
995

996
    """
997
    filename = self._GetJobPath(job.id)
998
    data = serializer.DumpJson(job.Serialize(), indent=False)
999
    logging.debug("Writing job %s to %s", job.id, filename)
1000
    self._WriteAndReplicateFileUnlocked(filename, data)
1001

    
1002
    # Notify waiters about potential changes
1003
    job.change.notifyAll()
1004

    
1005
  @utils.LockedMethod
1006
  @_RequireOpenQueue
1007
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1008
                        timeout):
1009
    """Waits for changes in a job.
1010

1011
    @type job_id: string
1012
    @param job_id: Job identifier
1013
    @type fields: list of strings
1014
    @param fields: Which fields to check for changes
1015
    @type prev_job_info: list or None
1016
    @param prev_job_info: Last job information returned
1017
    @type prev_log_serial: int
1018
    @param prev_log_serial: Last job message serial number
1019
    @type timeout: float
1020
    @param timeout: maximum time to wait
1021
    @rtype: tuple (job info, log entries)
1022
    @return: a tuple of the job information as required via
1023
        the fields parameter, and the log entries as a list
1024

1025
        if the job has not changed and the timeout has expired,
1026
        we instead return a special value,
1027
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1028
        as such by the clients
1029

1030
    """
1031
    logging.debug("Waiting for changes in job %s", job_id)
1032
    end_time = time.time() + timeout
1033
    while True:
1034
      delta_time = end_time - time.time()
1035
      if delta_time < 0:
1036
        return constants.JOB_NOTCHANGED
1037

    
1038
      job = self._LoadJobUnlocked(job_id)
1039
      if not job:
1040
        logging.debug("Job %s not found", job_id)
1041
        break
1042

    
1043
      status = job.CalcStatus()
1044
      job_info = self._GetJobInfoUnlocked(job, fields)
1045
      log_entries = job.GetLogEntries(prev_log_serial)
1046

    
1047
      # Serializing and deserializing data can cause type changes (e.g. from
1048
      # tuple to list) or precision loss. We're doing it here so that we get
1049
      # the same modifications as the data received from the client. Without
1050
      # this, the comparison afterwards might fail without the data being
1051
      # significantly different.
1052
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1053
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1054

    
1055
      if status not in (constants.JOB_STATUS_QUEUED,
1056
                        constants.JOB_STATUS_RUNNING,
1057
                        constants.JOB_STATUS_WAITLOCK):
1058
        # Don't even try to wait if the job is no longer running, there will be
1059
        # no changes.
1060
        break
1061

    
1062
      if (prev_job_info != job_info or
1063
          (log_entries and prev_log_serial != log_entries[0][0])):
1064
        break
1065

    
1066
      logging.debug("Waiting again")
1067

    
1068
      # Release the queue lock while waiting
1069
      job.change.wait(delta_time)
1070

    
1071
    logging.debug("Job %s changed", job_id)
1072

    
1073
    return (job_info, log_entries)
1074

    
1075
  @utils.LockedMethod
1076
  @_RequireOpenQueue
1077
  def CancelJob(self, job_id):
1078
    """Cancels a job.
1079

1080
    This will only succeed if the job has not started yet.
1081

1082
    @type job_id: string
1083
    @param job_id: job ID of job to be cancelled.
1084

1085
    """
1086
    logging.info("Cancelling job %s", job_id)
1087

    
1088
    job = self._LoadJobUnlocked(job_id)
1089
    if not job:
1090
      logging.debug("Job %s not found", job_id)
1091
      return (False, "Job %s not found" % job_id)
1092

    
1093
    job_status = job.CalcStatus()
1094

    
1095
    if job_status not in (constants.JOB_STATUS_QUEUED,
1096
                          constants.JOB_STATUS_WAITLOCK):
1097
      logging.debug("Job %s is no longer in the queue", job.id)
1098
      return (False, "Job %s is no longer in the queue" % job.id)
1099

    
1100
    if job_status == constants.JOB_STATUS_QUEUED:
1101
      self.CancelJobUnlocked(job)
1102
      return (True, "Job %s canceled" % job.id)
1103

    
1104
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1105
      # The worker will notice the new status and cancel the job
1106
      try:
1107
        for op in job.ops:
1108
          op.status = constants.OP_STATUS_CANCELING
1109
      finally:
1110
        self.UpdateJobUnlocked(job)
1111
      return (True, "Job %s will be canceled" % job.id)
1112

    
1113
  @_RequireOpenQueue
1114
  def CancelJobUnlocked(self, job):
1115
    """Marks a job as canceled.
1116

1117
    """
1118
    try:
1119
      for op in job.ops:
1120
        op.status = constants.OP_STATUS_ERROR
1121
        op.result = "Job canceled by request"
1122
    finally:
1123
      self.UpdateJobUnlocked(job)
1124

    
1125
  @_RequireOpenQueue
1126
  def _ArchiveJobsUnlocked(self, jobs):
1127
    """Archives jobs.
1128

1129
    @type jobs: list of L{_QueuedJob}
1130
    @param job: Job objects
1131
    @rtype: int
1132
    @return: Number of archived jobs
1133

1134
    """
1135
    archive_jobs = []
1136
    rename_files = []
1137
    for job in jobs:
1138
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1139
                                  constants.JOB_STATUS_SUCCESS,
1140
                                  constants.JOB_STATUS_ERROR):
1141
        logging.debug("Job %s is not yet done", job.id)
1142
        continue
1143

    
1144
      archive_jobs.append(job)
1145

    
1146
      old = self._GetJobPath(job.id)
1147
      new = self._GetArchivedJobPath(job.id)
1148
      rename_files.append((old, new))
1149

    
1150
    # TODO: What if 1..n files fail to rename?
1151
    self._RenameFilesUnlocked(rename_files)
1152

    
1153
    logging.debug("Successfully archived job(s) %s",
1154
                  ", ".join(job.id for job in archive_jobs))
1155

    
1156
    return len(archive_jobs)
1157

    
1158
  @utils.LockedMethod
1159
  @_RequireOpenQueue
1160
  def ArchiveJob(self, job_id):
1161
    """Archives a job.
1162

1163
    This is just a wrapper over L{_ArchiveJobUnlocked}.
1164

1165
    @type job_id: string
1166
    @param job_id: Job ID of job to be archived.
1167
    @rtype: bool
1168
    @return: Whether job was archived
1169

1170
    """
1171
    logging.info("Archiving job %s", job_id)
1172

    
1173
    job = self._LoadJobUnlocked(job_id)
1174
    if not job:
1175
      logging.debug("Job %s not found", job_id)
1176
      return False
1177

    
1178
    return self._ArchiveJobUnlocked([job]) == 1
1179

    
1180
  @utils.LockedMethod
1181
  @_RequireOpenQueue
1182
  def AutoArchiveJobs(self, age, timeout):
1183
    """Archives all jobs based on age.
1184

1185
    The method will archive all jobs which are older than the age
1186
    parameter. For jobs that don't have an end timestamp, the start
1187
    timestamp will be considered. The special '-1' age will cause
1188
    archival of all jobs (that are not running or queued).
1189

1190
    @type age: int
1191
    @param age: the minimum age in seconds
1192

1193
    """
1194
    logging.info("Archiving jobs with age more than %s seconds", age)
1195

    
1196
    now = time.time()
1197
    end_time = now + timeout
1198
    archived_count = 0
1199
    last_touched = 0
1200

    
1201
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1202
    pending = []
1203
    for idx, job_id in enumerate(all_job_ids):
1204
      last_touched = idx
1205

    
1206
      # Not optimal because jobs could be pending
1207
      # TODO: Measure average duration for job archival and take number of
1208
      # pending jobs into account.
1209
      if time.time() > end_time:
1210
        break
1211

    
1212
      # Returns None if the job failed to load
1213
      job = self._LoadJobUnlocked(job_id)
1214
      if job:
1215
        if job.end_timestamp is None:
1216
          if job.start_timestamp is None:
1217
            job_age = job.received_timestamp
1218
          else:
1219
            job_age = job.start_timestamp
1220
        else:
1221
          job_age = job.end_timestamp
1222

    
1223
        if age == -1 or now - job_age[0] > age:
1224
          pending.append(job)
1225

    
1226
          # Archive 10 jobs at a time
1227
          if len(pending) >= 10:
1228
            archived_count += self._ArchiveJobsUnlocked(pending)
1229
            pending = []
1230

    
1231
    if pending:
1232
      archived_count += self._ArchiveJobsUnlocked(pending)
1233

    
1234
    return (archived_count, len(all_job_ids) - last_touched - 1)
1235

    
1236
  def _GetJobInfoUnlocked(self, job, fields):
1237
    """Returns information about a job.
1238

1239
    @type job: L{_QueuedJob}
1240
    @param job: the job which we query
1241
    @type fields: list
1242
    @param fields: names of fields to return
1243
    @rtype: list
1244
    @return: list with one element for each field
1245
    @raise errors.OpExecError: when an invalid field
1246
        has been passed
1247

1248
    """
1249
    row = []
1250
    for fname in fields:
1251
      if fname == "id":
1252
        row.append(job.id)
1253
      elif fname == "status":
1254
        row.append(job.CalcStatus())
1255
      elif fname == "ops":
1256
        row.append([op.input.__getstate__() for op in job.ops])
1257
      elif fname == "opresult":
1258
        row.append([op.result for op in job.ops])
1259
      elif fname == "opstatus":
1260
        row.append([op.status for op in job.ops])
1261
      elif fname == "oplog":
1262
        row.append([op.log for op in job.ops])
1263
      elif fname == "opstart":
1264
        row.append([op.start_timestamp for op in job.ops])
1265
      elif fname == "opend":
1266
        row.append([op.end_timestamp for op in job.ops])
1267
      elif fname == "received_ts":
1268
        row.append(job.received_timestamp)
1269
      elif fname == "start_ts":
1270
        row.append(job.start_timestamp)
1271
      elif fname == "end_ts":
1272
        row.append(job.end_timestamp)
1273
      elif fname == "summary":
1274
        row.append([op.input.Summary() for op in job.ops])
1275
      else:
1276
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1277
    return row
1278

    
1279
  @utils.LockedMethod
1280
  @_RequireOpenQueue
1281
  def QueryJobs(self, job_ids, fields):
1282
    """Returns a list of jobs in queue.
1283

1284
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1285
    processing for each job.
1286

1287
    @type job_ids: list
1288
    @param job_ids: sequence of job identifiers or None for all
1289
    @type fields: list
1290
    @param fields: names of fields to return
1291
    @rtype: list
1292
    @return: list one element per job, each element being list with
1293
        the requested fields
1294

1295
    """
1296
    jobs = []
1297

    
1298
    for job in self._GetJobsUnlocked(job_ids):
1299
      if job is None:
1300
        jobs.append(None)
1301
      else:
1302
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1303

    
1304
    return jobs
1305

    
1306
  @utils.LockedMethod
1307
  @_RequireOpenQueue
1308
  def Shutdown(self):
1309
    """Stops the job queue.
1310

1311
    This shutdowns all the worker threads an closes the queue.
1312

1313
    """
1314
    self._wpool.TerminateWorkers()
1315

    
1316
    self._queue_lock.Close()
1317
    self._queue_lock = None