Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 5278185a

History | View | Annotate | Download (37.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encasulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  def __init__(self, op):
84
    """Constructor for the _QuededOpCode.
85

86
    @type op: L{opcodes.OpCode}
87
    @param op: the opcode we encapsulate
88

89
    """
90
    self.input = op
91
    self.status = constants.OP_STATUS_QUEUED
92
    self.result = None
93
    self.log = []
94
    self.start_timestamp = None
95
    self.end_timestamp = None
96

    
97
  @classmethod
98
  def Restore(cls, state):
99
    """Restore the _QueuedOpCode from the serialized form.
100

101
    @type state: dict
102
    @param state: the serialized state
103
    @rtype: _QueuedOpCode
104
    @return: a new _QueuedOpCode instance
105

106
    """
107
    obj = _QueuedOpCode.__new__(cls)
108
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109
    obj.status = state["status"]
110
    obj.result = state["result"]
111
    obj.log = state["log"]
112
    obj.start_timestamp = state.get("start_timestamp", None)
113
    obj.end_timestamp = state.get("end_timestamp", None)
114
    return obj
115

    
116
  def Serialize(self):
117
    """Serializes this _QueuedOpCode.
118

119
    @rtype: dict
120
    @return: the dictionary holding the serialized state
121

122
    """
123
    return {
124
      "input": self.input.__getstate__(),
125
      "status": self.status,
126
      "result": self.result,
127
      "log": self.log,
128
      "start_timestamp": self.start_timestamp,
129
      "end_timestamp": self.end_timestamp,
130
      }
131

    
132

    
133
class _QueuedJob(object):
134
  """In-memory job representation.
135

136
  This is what we use to track the user-submitted jobs. Locking must
137
  be taken care of by users of this class.
138

139
  @type queue: L{JobQueue}
140
  @ivar queue: the parent queue
141
  @ivar id: the job ID
142
  @type ops: list
143
  @ivar ops: the list of _QueuedOpCode that constitute the job
144
  @type run_op_index: int
145
  @ivar run_op_index: the currently executing opcode, or -1 if
146
      we didn't yet start executing
147
  @type log_serial: int
148
  @ivar log_serial: holds the index for the next log entry
149
  @ivar received_timestamp: the timestamp for when the job was received
150
  @ivar start_timestmap: the timestamp for start of execution
151
  @ivar end_timestamp: the timestamp for end of execution
152
  @ivar change: a Condition variable we use for waiting for job changes
153

154
  """
155
  def __init__(self, queue, job_id, ops):
156
    """Constructor for the _QueuedJob.
157

158
    @type queue: L{JobQueue}
159
    @param queue: our parent queue
160
    @type job_id: job_id
161
    @param job_id: our job id
162
    @type ops: list
163
    @param ops: the list of opcodes we hold, which will be encapsulated
164
        in _QueuedOpCodes
165

166
    """
167
    if not ops:
168
      # TODO: use a better exception
169
      raise Exception("No opcodes")
170

    
171
    self.queue = queue
172
    self.id = job_id
173
    self.ops = [_QueuedOpCode(op) for op in ops]
174
    self.run_op_index = -1
175
    self.log_serial = 0
176
    self.received_timestamp = TimeStampNow()
177
    self.start_timestamp = None
178
    self.end_timestamp = None
179

    
180
    # Condition to wait for changes
181
    self.change = threading.Condition(self.queue._lock)
182

    
183
  @classmethod
184
  def Restore(cls, queue, state):
185
    """Restore a _QueuedJob from serialized state:
186

187
    @type queue: L{JobQueue}
188
    @param queue: to which queue the restored job belongs
189
    @type state: dict
190
    @param state: the serialized state
191
    @rtype: _JobQueue
192
    @return: the restored _JobQueue instance
193

194
    """
195
    obj = _QueuedJob.__new__(cls)
196
    obj.queue = queue
197
    obj.id = state["id"]
198
    obj.run_op_index = state["run_op_index"]
199
    obj.received_timestamp = state.get("received_timestamp", None)
200
    obj.start_timestamp = state.get("start_timestamp", None)
201
    obj.end_timestamp = state.get("end_timestamp", None)
202

    
203
    obj.ops = []
204
    obj.log_serial = 0
205
    for op_state in state["ops"]:
206
      op = _QueuedOpCode.Restore(op_state)
207
      for log_entry in op.log:
208
        obj.log_serial = max(obj.log_serial, log_entry[0])
209
      obj.ops.append(op)
210

    
211
    # Condition to wait for changes
212
    obj.change = threading.Condition(obj.queue._lock)
213

    
214
    return obj
215

    
216
  def Serialize(self):
217
    """Serialize the _JobQueue instance.
218

219
    @rtype: dict
220
    @return: the serialized state
221

222
    """
223
    return {
224
      "id": self.id,
225
      "ops": [op.Serialize() for op in self.ops],
226
      "run_op_index": self.run_op_index,
227
      "start_timestamp": self.start_timestamp,
228
      "end_timestamp": self.end_timestamp,
229
      "received_timestamp": self.received_timestamp,
230
      }
231

    
232
  def CalcStatus(self):
233
    """Compute the status of this job.
234

235
    This function iterates over all the _QueuedOpCodes in the job and
236
    based on their status, computes the job status.
237

238
    The algorithm is:
239
      - if we find a cancelled, or finished with error, the job
240
        status will be the same
241
      - otherwise, the last opcode with the status one of:
242
          - waitlock
243
          - canceling
244
          - running
245

246
        will determine the job status
247

248
      - otherwise, it means either all opcodes are queued, or success,
249
        and the job status will be the same
250

251
    @return: the job status
252

253
    """
254
    status = constants.JOB_STATUS_QUEUED
255

    
256
    all_success = True
257
    for op in self.ops:
258
      if op.status == constants.OP_STATUS_SUCCESS:
259
        continue
260

    
261
      all_success = False
262

    
263
      if op.status == constants.OP_STATUS_QUEUED:
264
        pass
265
      elif op.status == constants.OP_STATUS_WAITLOCK:
266
        status = constants.JOB_STATUS_WAITLOCK
267
      elif op.status == constants.OP_STATUS_RUNNING:
268
        status = constants.JOB_STATUS_RUNNING
269
      elif op.status == constants.OP_STATUS_CANCELING:
270
        status = constants.JOB_STATUS_CANCELING
271
        break
272
      elif op.status == constants.OP_STATUS_ERROR:
273
        status = constants.JOB_STATUS_ERROR
274
        # The whole job fails if one opcode failed
275
        break
276
      elif op.status == constants.OP_STATUS_CANCELED:
277
        status = constants.OP_STATUS_CANCELED
278
        break
279

    
280
    if all_success:
281
      status = constants.JOB_STATUS_SUCCESS
282

    
283
    return status
284

    
285
  def GetLogEntries(self, newer_than):
286
    """Selectively returns the log entries.
287

288
    @type newer_than: None or int
289
    @param newer_than: if this is None, return all log enties,
290
        otherwise return only the log entries with serial higher
291
        than this value
292
    @rtype: list
293
    @return: the list of the log entries selected
294

295
    """
296
    if newer_than is None:
297
      serial = -1
298
    else:
299
      serial = newer_than
300

    
301
    entries = []
302
    for op in self.ops:
303
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
304

    
305
    return entries
306

    
307

    
308
class _JobQueueWorker(workerpool.BaseWorker):
309
  """The actual job workers.
310

311
  """
312
  def _NotifyStart(self):
313
    """Mark the opcode as running, not lock-waiting.
314

315
    This is called from the mcpu code as a notifier function, when the
316
    LU is finally about to start the Exec() method. Of course, to have
317
    end-user visible results, the opcode must be initially (before
318
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
319

320
    """
321
    assert self.queue, "Queue attribute is missing"
322
    assert self.opcode, "Opcode attribute is missing"
323

    
324
    self.queue.acquire()
325
    try:
326
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327
                                    constants.OP_STATUS_CANCELING)
328

    
329
      # Cancel here if we were asked to
330
      if self.opcode.status == constants.OP_STATUS_CANCELING:
331
        raise CancelJob()
332

    
333
      self.opcode.status = constants.OP_STATUS_RUNNING
334
    finally:
335
      self.queue.release()
336

    
337
  def RunTask(self, job):
338
    """Job executor.
339

340
    This functions processes a job. It is closely tied to the _QueuedJob and
341
    _QueuedOpCode classes.
342

343
    @type job: L{_QueuedJob}
344
    @param job: the job to be processed
345

346
    """
347
    logging.info("Worker %s processing job %s",
348
                  self.worker_id, job.id)
349
    proc = mcpu.Processor(self.pool.queue.context)
350
    self.queue = queue = job.queue
351
    try:
352
      try:
353
        count = len(job.ops)
354
        for idx, op in enumerate(job.ops):
355
          op_summary = op.input.Summary()
356
          try:
357
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
358
                         op_summary)
359

    
360
            queue.acquire()
361
            try:
362
              assert op.status == constants.OP_STATUS_QUEUED
363
              job.run_op_index = idx
364
              op.status = constants.OP_STATUS_WAITLOCK
365
              op.result = None
366
              op.start_timestamp = TimeStampNow()
367
              if idx == 0: # first opcode
368
                job.start_timestamp = op.start_timestamp
369
              queue.UpdateJobUnlocked(job)
370

    
371
              input_opcode = op.input
372
            finally:
373
              queue.release()
374

    
375
            def _Log(*args):
376
              """Append a log entry.
377

378
              """
379
              assert len(args) < 3
380

    
381
              if len(args) == 1:
382
                log_type = constants.ELOG_MESSAGE
383
                log_msg = args[0]
384
              else:
385
                log_type, log_msg = args
386

    
387
              # The time is split to make serialization easier and not lose
388
              # precision.
389
              timestamp = utils.SplitTime(time.time())
390

    
391
              queue.acquire()
392
              try:
393
                job.log_serial += 1
394
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
395

    
396
                job.change.notifyAll()
397
              finally:
398
                queue.release()
399

    
400
            # Make sure not to hold lock while _Log is called
401
            self.opcode = op
402
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
403

    
404
            queue.acquire()
405
            try:
406
              op.status = constants.OP_STATUS_SUCCESS
407
              op.result = result
408
              op.end_timestamp = TimeStampNow()
409
              queue.UpdateJobUnlocked(job)
410
            finally:
411
              queue.release()
412

    
413
            logging.info("Op %s/%s: Successfully finished opcode %s",
414
                         idx + 1, count, op_summary)
415
          except CancelJob:
416
            # Will be handled further up
417
            raise
418
          except Exception, err:
419
            queue.acquire()
420
            try:
421
              try:
422
                op.status = constants.OP_STATUS_ERROR
423
                op.result = str(err)
424
                op.end_timestamp = TimeStampNow()
425
                logging.info("Op %s/%s: Error in opcode %s", idx + 1, count,
426
                             op_summary)
427
              finally:
428
                queue.UpdateJobUnlocked(job)
429
            finally:
430
              queue.release()
431
            raise
432

    
433
      except CancelJob:
434
        queue.acquire()
435
        try:
436
          queue.CancelJobUnlocked(job)
437
        finally:
438
          queue.release()
439
      except errors.GenericError, err:
440
        logging.exception("Ganeti exception")
441
      except:
442
        logging.exception("Unhandled exception")
443
    finally:
444
      queue.acquire()
445
      try:
446
        try:
447
          job.run_op_idx = -1
448
          job.end_timestamp = TimeStampNow()
449
          queue.UpdateJobUnlocked(job)
450
        finally:
451
          job_id = job.id
452
          status = job.CalcStatus()
453
      finally:
454
        queue.release()
455
      logging.info("Worker %s finished job %s, status = %s",
456
                   self.worker_id, job_id, status)
457

    
458

    
459
class _JobQueueWorkerPool(workerpool.WorkerPool):
460
  """Simple class implementing a job-processing workerpool.
461

462
  """
463
  def __init__(self, queue):
464
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
465
                                              _JobQueueWorker)
466
    self.queue = queue
467

    
468

    
469
class JobQueue(object):
470
  """Quue used to manaage the jobs.
471

472
  @cvar _RE_JOB_FILE: regex matching the valid job file names
473

474
  """
475
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
476

    
477
  def _RequireOpenQueue(fn):
478
    """Decorator for "public" functions.
479

480
    This function should be used for all 'public' functions. That is,
481
    functions usually called from other classes.
482

483
    @warning: Use this decorator only after utils.LockedMethod!
484

485
    Example::
486
      @utils.LockedMethod
487
      @_RequireOpenQueue
488
      def Example(self):
489
        pass
490

491
    """
492
    def wrapper(self, *args, **kwargs):
493
      assert self._queue_lock is not None, "Queue should be open"
494
      return fn(self, *args, **kwargs)
495
    return wrapper
496

    
497
  def __init__(self, context):
498
    """Constructor for JobQueue.
499

500
    The constructor will initialize the job queue object and then
501
    start loading the current jobs from disk, either for starting them
502
    (if they were queue) or for aborting them (if they were already
503
    running).
504

505
    @type context: GanetiContext
506
    @param context: the context object for access to the configuration
507
        data and other ganeti objects
508

509
    """
510
    self.context = context
511
    self._memcache = weakref.WeakValueDictionary()
512
    self._my_hostname = utils.HostInfo().name
513

    
514
    # Locking
515
    self._lock = threading.Lock()
516
    self.acquire = self._lock.acquire
517
    self.release = self._lock.release
518

    
519
    # Initialize
520
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
521

    
522
    # Read serial file
523
    self._last_serial = jstore.ReadSerial()
524
    assert self._last_serial is not None, ("Serial file was modified between"
525
                                           " check in jstore and here")
526

    
527
    # Get initial list of nodes
528
    self._nodes = dict((n.name, n.primary_ip)
529
                       for n in self.context.cfg.GetAllNodesInfo().values()
530
                       if n.master_candidate)
531

    
532
    # Remove master node
533
    try:
534
      del self._nodes[self._my_hostname]
535
    except KeyError:
536
      pass
537

    
538
    # TODO: Check consistency across nodes
539

    
540
    # Setup worker pool
541
    self._wpool = _JobQueueWorkerPool(self)
542
    try:
543
      # We need to lock here because WorkerPool.AddTask() may start a job while
544
      # we're still doing our work.
545
      self.acquire()
546
      try:
547
        logging.info("Inspecting job queue")
548

    
549
        all_job_ids = self._GetJobIDsUnlocked()
550
        jobs_count = len(all_job_ids)
551
        lastinfo = time.time()
552
        for idx, job_id in enumerate(all_job_ids):
553
          # Give an update every 1000 jobs or 10 seconds
554
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
555
              idx == (jobs_count - 1)):
556
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
557
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
558
            lastinfo = time.time()
559

    
560
          job = self._LoadJobUnlocked(job_id)
561

    
562
          # a failure in loading the job can cause 'None' to be returned
563
          if job is None:
564
            continue
565

    
566
          status = job.CalcStatus()
567

    
568
          if status in (constants.JOB_STATUS_QUEUED, ):
569
            self._wpool.AddTask(job)
570

    
571
          elif status in (constants.JOB_STATUS_RUNNING,
572
                          constants.JOB_STATUS_WAITLOCK,
573
                          constants.JOB_STATUS_CANCELING):
574
            logging.warning("Unfinished job %s found: %s", job.id, job)
575
            try:
576
              for op in job.ops:
577
                op.status = constants.OP_STATUS_ERROR
578
                op.result = "Unclean master daemon shutdown"
579
            finally:
580
              self.UpdateJobUnlocked(job)
581

    
582
        logging.info("Job queue inspection finished")
583
      finally:
584
        self.release()
585
    except:
586
      self._wpool.TerminateWorkers()
587
      raise
588

    
589
  @utils.LockedMethod
590
  @_RequireOpenQueue
591
  def AddNode(self, node):
592
    """Register a new node with the queue.
593

594
    @type node: L{objects.Node}
595
    @param node: the node object to be added
596

597
    """
598
    node_name = node.name
599
    assert node_name != self._my_hostname
600

    
601
    # Clean queue directory on added node
602
    rpc.RpcRunner.call_jobqueue_purge(node_name)
603

    
604
    if not node.master_candidate:
605
      # remove if existing, ignoring errors
606
      self._nodes.pop(node_name, None)
607
      # and skip the replication of the job ids
608
      return
609

    
610
    # Upload the whole queue excluding archived jobs
611
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
612

    
613
    # Upload current serial file
614
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
615

    
616
    for file_name in files:
617
      # Read file content
618
      fd = open(file_name, "r")
619
      try:
620
        content = fd.read()
621
      finally:
622
        fd.close()
623

    
624
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
625
                                                  [node.primary_ip],
626
                                                  file_name, content)
627
      if not result[node_name]:
628
        logging.error("Failed to upload %s to %s", file_name, node_name)
629

    
630
    self._nodes[node_name] = node.primary_ip
631

    
632
  @utils.LockedMethod
633
  @_RequireOpenQueue
634
  def RemoveNode(self, node_name):
635
    """Callback called when removing nodes from the cluster.
636

637
    @type node_name: str
638
    @param node_name: the name of the node to remove
639

640
    """
641
    try:
642
      # The queue is removed by the "leave node" RPC call.
643
      del self._nodes[node_name]
644
    except KeyError:
645
      pass
646

    
647
  def _CheckRpcResult(self, result, nodes, failmsg):
648
    """Verifies the status of an RPC call.
649

650
    Since we aim to keep consistency should this node (the current
651
    master) fail, we will log errors if our rpc fail, and especially
652
    log the case when more than half of the nodes failes.
653

654
    @param result: the data as returned from the rpc call
655
    @type nodes: list
656
    @param nodes: the list of nodes we made the call to
657
    @type failmsg: str
658
    @param failmsg: the identifier to be used for logging
659

660
    """
661
    failed = []
662
    success = []
663

    
664
    for node in nodes:
665
      if result[node]:
666
        success.append(node)
667
      else:
668
        failed.append(node)
669

    
670
    if failed:
671
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
672

    
673
    # +1 for the master node
674
    if (len(success) + 1) < len(failed):
675
      # TODO: Handle failing nodes
676
      logging.error("More than half of the nodes failed")
677

    
678
  def _GetNodeIp(self):
679
    """Helper for returning the node name/ip list.
680

681
    @rtype: (list, list)
682
    @return: a tuple of two lists, the first one with the node
683
        names and the second one with the node addresses
684

685
    """
686
    name_list = self._nodes.keys()
687
    addr_list = [self._nodes[name] for name in name_list]
688
    return name_list, addr_list
689

    
690
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
691
    """Writes a file locally and then replicates it to all nodes.
692

693
    This function will replace the contents of a file on the local
694
    node and then replicate it to all the other nodes we have.
695

696
    @type file_name: str
697
    @param file_name: the path of the file to be replicated
698
    @type data: str
699
    @param data: the new contents of the file
700

701
    """
702
    utils.WriteFile(file_name, data=data)
703

    
704
    names, addrs = self._GetNodeIp()
705
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
706
    self._CheckRpcResult(result, self._nodes,
707
                         "Updating %s" % file_name)
708

    
709
  def _RenameFilesUnlocked(self, rename):
710
    """Renames a file locally and then replicate the change.
711

712
    This function will rename a file in the local queue directory
713
    and then replicate this rename to all the other nodes we have.
714

715
    @type rename: list of (old, new)
716
    @param rename: List containing tuples mapping old to new names
717

718
    """
719
    # Rename them locally
720
    for old, new in rename:
721
      utils.RenameFile(old, new, mkdir=True)
722

    
723
    # ... and on all nodes
724
    names, addrs = self._GetNodeIp()
725
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
726
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
727

    
728
  def _FormatJobID(self, job_id):
729
    """Convert a job ID to string format.
730

731
    Currently this just does C{str(job_id)} after performing some
732
    checks, but if we want to change the job id format this will
733
    abstract this change.
734

735
    @type job_id: int or long
736
    @param job_id: the numeric job id
737
    @rtype: str
738
    @return: the formatted job id
739

740
    """
741
    if not isinstance(job_id, (int, long)):
742
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
743
    if job_id < 0:
744
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
745

    
746
    return str(job_id)
747

    
748
  @classmethod
749
  def _GetArchiveDirectory(cls, job_id):
750
    """Returns the archive directory for a job.
751

752
    @type job_id: str
753
    @param job_id: Job identifier
754
    @rtype: str
755
    @return: Directory name
756

757
    """
758
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
759

    
760
  def _NewSerialUnlocked(self):
761
    """Generates a new job identifier.
762

763
    Job identifiers are unique during the lifetime of a cluster.
764

765
    @rtype: str
766
    @return: a string representing the job identifier.
767

768
    """
769
    # New number
770
    serial = self._last_serial + 1
771

    
772
    # Write to file
773
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
774
                                        "%s\n" % serial)
775

    
776
    # Keep it only if we were able to write the file
777
    self._last_serial = serial
778

    
779
    return self._FormatJobID(serial)
780

    
781
  @staticmethod
782
  def _GetJobPath(job_id):
783
    """Returns the job file for a given job id.
784

785
    @type job_id: str
786
    @param job_id: the job identifier
787
    @rtype: str
788
    @return: the path to the job file
789

790
    """
791
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
792

    
793
  @classmethod
794
  def _GetArchivedJobPath(cls, job_id):
795
    """Returns the archived job file for a give job id.
796

797
    @type job_id: str
798
    @param job_id: the job identifier
799
    @rtype: str
800
    @return: the path to the archived job file
801

802
    """
803
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
804
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
805

    
806
  @classmethod
807
  def _ExtractJobID(cls, name):
808
    """Extract the job id from a filename.
809

810
    @type name: str
811
    @param name: the job filename
812
    @rtype: job id or None
813
    @return: the job id corresponding to the given filename,
814
        or None if the filename does not represent a valid
815
        job file
816

817
    """
818
    m = cls._RE_JOB_FILE.match(name)
819
    if m:
820
      return m.group(1)
821
    else:
822
      return None
823

    
824
  def _GetJobIDsUnlocked(self, archived=False):
825
    """Return all known job IDs.
826

827
    If the parameter archived is True, archived jobs IDs will be
828
    included. Currently this argument is unused.
829

830
    The method only looks at disk because it's a requirement that all
831
    jobs are present on disk (so in the _memcache we don't have any
832
    extra IDs).
833

834
    @rtype: list
835
    @return: the list of job IDs
836

837
    """
838
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
839
    jlist = utils.NiceSort(jlist)
840
    return jlist
841

    
842
  def _ListJobFiles(self):
843
    """Returns the list of current job files.
844

845
    @rtype: list
846
    @return: the list of job file names
847

848
    """
849
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
850
            if self._RE_JOB_FILE.match(name)]
851

    
852
  def _LoadJobUnlocked(self, job_id):
853
    """Loads a job from the disk or memory.
854

855
    Given a job id, this will return the cached job object if
856
    existing, or try to load the job from the disk. If loading from
857
    disk, it will also add the job to the cache.
858

859
    @param job_id: the job id
860
    @rtype: L{_QueuedJob} or None
861
    @return: either None or the job object
862

863
    """
864
    job = self._memcache.get(job_id, None)
865
    if job:
866
      logging.debug("Found job %s in memcache", job_id)
867
      return job
868

    
869
    filepath = self._GetJobPath(job_id)
870
    logging.debug("Loading job from %s", filepath)
871
    try:
872
      fd = open(filepath, "r")
873
    except IOError, err:
874
      if err.errno in (errno.ENOENT, ):
875
        return None
876
      raise
877
    try:
878
      data = serializer.LoadJson(fd.read())
879
    finally:
880
      fd.close()
881

    
882
    try:
883
      job = _QueuedJob.Restore(self, data)
884
    except Exception, err:
885
      new_path = self._GetArchivedJobPath(job_id)
886
      if filepath == new_path:
887
        # job already archived (future case)
888
        logging.exception("Can't parse job %s", job_id)
889
      else:
890
        # non-archived case
891
        logging.exception("Can't parse job %s, will archive.", job_id)
892
        self._RenameFilesUnlocked([(filepath, new_path)])
893
      return None
894

    
895
    self._memcache[job_id] = job
896
    logging.debug("Added job %s to the cache", job_id)
897
    return job
898

    
899
  def _GetJobsUnlocked(self, job_ids):
900
    """Return a list of jobs based on their IDs.
901

902
    @type job_ids: list
903
    @param job_ids: either an empty list (meaning all jobs),
904
        or a list of job IDs
905
    @rtype: list
906
    @return: the list of job objects
907

908
    """
909
    if not job_ids:
910
      job_ids = self._GetJobIDsUnlocked()
911

    
912
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
913

    
914
  @staticmethod
915
  def _IsQueueMarkedDrain():
916
    """Check if the queue is marked from drain.
917

918
    This currently uses the queue drain file, which makes it a
919
    per-node flag. In the future this can be moved to the config file.
920

921
    @rtype: boolean
922
    @return: True of the job queue is marked for draining
923

924
    """
925
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
926

    
927
  @staticmethod
928
  def SetDrainFlag(drain_flag):
929
    """Sets the drain flag for the queue.
930

931
    This is similar to the function L{backend.JobQueueSetDrainFlag},
932
    and in the future we might merge them.
933

934
    @type drain_flag: boolean
935
    @param drain_flag: wheter to set or unset the drain flag
936

937
    """
938
    if drain_flag:
939
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
940
    else:
941
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
942
    return True
943

    
944
  @utils.LockedMethod
945
  @_RequireOpenQueue
946
  def SubmitJob(self, ops):
947
    """Create and store a new job.
948

949
    This enters the job into our job queue and also puts it on the new
950
    queue, in order for it to be picked up by the queue processors.
951

952
    @type ops: list
953
    @param ops: The list of OpCodes that will become the new job.
954
    @rtype: job ID
955
    @return: the job ID of the newly created job
956
    @raise errors.JobQueueDrainError: if the job is marked for draining
957

958
    """
959
    if self._IsQueueMarkedDrain():
960
      raise errors.JobQueueDrainError()
961

    
962
    # Check job queue size
963
    size = len(self._ListJobFiles())
964
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
965
      # TODO: Autoarchive jobs. Make sure it's not done on every job
966
      # submission, though.
967
      #size = ...
968
      pass
969

    
970
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
971
      raise errors.JobQueueFull()
972

    
973
    # Get job identifier
974
    job_id = self._NewSerialUnlocked()
975
    job = _QueuedJob(self, job_id, ops)
976

    
977
    # Write to disk
978
    self.UpdateJobUnlocked(job)
979

    
980
    logging.debug("Adding new job %s to the cache", job_id)
981
    self._memcache[job_id] = job
982

    
983
    # Add to worker pool
984
    self._wpool.AddTask(job)
985

    
986
    return job.id
987

    
988
  @_RequireOpenQueue
989
  def UpdateJobUnlocked(self, job):
990
    """Update a job's on disk storage.
991

992
    After a job has been modified, this function needs to be called in
993
    order to write the changes to disk and replicate them to the other
994
    nodes.
995

996
    @type job: L{_QueuedJob}
997
    @param job: the changed job
998

999
    """
1000
    filename = self._GetJobPath(job.id)
1001
    data = serializer.DumpJson(job.Serialize(), indent=False)
1002
    logging.debug("Writing job %s to %s", job.id, filename)
1003
    self._WriteAndReplicateFileUnlocked(filename, data)
1004

    
1005
    # Notify waiters about potential changes
1006
    job.change.notifyAll()
1007

    
1008
  @utils.LockedMethod
1009
  @_RequireOpenQueue
1010
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1011
                        timeout):
1012
    """Waits for changes in a job.
1013

1014
    @type job_id: string
1015
    @param job_id: Job identifier
1016
    @type fields: list of strings
1017
    @param fields: Which fields to check for changes
1018
    @type prev_job_info: list or None
1019
    @param prev_job_info: Last job information returned
1020
    @type prev_log_serial: int
1021
    @param prev_log_serial: Last job message serial number
1022
    @type timeout: float
1023
    @param timeout: maximum time to wait
1024
    @rtype: tuple (job info, log entries)
1025
    @return: a tuple of the job information as required via
1026
        the fields parameter, and the log entries as a list
1027

1028
        if the job has not changed and the timeout has expired,
1029
        we instead return a special value,
1030
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1031
        as such by the clients
1032

1033
    """
1034
    logging.debug("Waiting for changes in job %s", job_id)
1035
    end_time = time.time() + timeout
1036
    while True:
1037
      delta_time = end_time - time.time()
1038
      if delta_time < 0:
1039
        return constants.JOB_NOTCHANGED
1040

    
1041
      job = self._LoadJobUnlocked(job_id)
1042
      if not job:
1043
        logging.debug("Job %s not found", job_id)
1044
        break
1045

    
1046
      status = job.CalcStatus()
1047
      job_info = self._GetJobInfoUnlocked(job, fields)
1048
      log_entries = job.GetLogEntries(prev_log_serial)
1049

    
1050
      # Serializing and deserializing data can cause type changes (e.g. from
1051
      # tuple to list) or precision loss. We're doing it here so that we get
1052
      # the same modifications as the data received from the client. Without
1053
      # this, the comparison afterwards might fail without the data being
1054
      # significantly different.
1055
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1056
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1057

    
1058
      if status not in (constants.JOB_STATUS_QUEUED,
1059
                        constants.JOB_STATUS_RUNNING,
1060
                        constants.JOB_STATUS_WAITLOCK):
1061
        # Don't even try to wait if the job is no longer running, there will be
1062
        # no changes.
1063
        break
1064

    
1065
      if (prev_job_info != job_info or
1066
          (log_entries and prev_log_serial != log_entries[0][0])):
1067
        break
1068

    
1069
      logging.debug("Waiting again")
1070

    
1071
      # Release the queue lock while waiting
1072
      job.change.wait(delta_time)
1073

    
1074
    logging.debug("Job %s changed", job_id)
1075

    
1076
    return (job_info, log_entries)
1077

    
1078
  @utils.LockedMethod
1079
  @_RequireOpenQueue
1080
  def CancelJob(self, job_id):
1081
    """Cancels a job.
1082

1083
    This will only succeed if the job has not started yet.
1084

1085
    @type job_id: string
1086
    @param job_id: job ID of job to be cancelled.
1087

1088
    """
1089
    logging.info("Cancelling job %s", job_id)
1090

    
1091
    job = self._LoadJobUnlocked(job_id)
1092
    if not job:
1093
      logging.debug("Job %s not found", job_id)
1094
      return (False, "Job %s not found" % job_id)
1095

    
1096
    job_status = job.CalcStatus()
1097

    
1098
    if job_status not in (constants.JOB_STATUS_QUEUED,
1099
                          constants.JOB_STATUS_WAITLOCK):
1100
      logging.debug("Job %s is no longer in the queue", job.id)
1101
      return (False, "Job %s is no longer in the queue" % job.id)
1102

    
1103
    if job_status == constants.JOB_STATUS_QUEUED:
1104
      self.CancelJobUnlocked(job)
1105
      return (True, "Job %s canceled" % job.id)
1106

    
1107
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1108
      # The worker will notice the new status and cancel the job
1109
      try:
1110
        for op in job.ops:
1111
          op.status = constants.OP_STATUS_CANCELING
1112
      finally:
1113
        self.UpdateJobUnlocked(job)
1114
      return (True, "Job %s will be canceled" % job.id)
1115

    
1116
  @_RequireOpenQueue
1117
  def CancelJobUnlocked(self, job):
1118
    """Marks a job as canceled.
1119

1120
    """
1121
    try:
1122
      for op in job.ops:
1123
        op.status = constants.OP_STATUS_ERROR
1124
        op.result = "Job canceled by request"
1125
    finally:
1126
      self.UpdateJobUnlocked(job)
1127

    
1128
  @_RequireOpenQueue
1129
  def _ArchiveJobsUnlocked(self, jobs):
1130
    """Archives jobs.
1131

1132
    @type jobs: list of L{_QueuedJob}
1133
    @param jobs: Job objects
1134
    @rtype: int
1135
    @return: Number of archived jobs
1136

1137
    """
1138
    archive_jobs = []
1139
    rename_files = []
1140
    for job in jobs:
1141
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1142
                                  constants.JOB_STATUS_SUCCESS,
1143
                                  constants.JOB_STATUS_ERROR):
1144
        logging.debug("Job %s is not yet done", job.id)
1145
        continue
1146

    
1147
      archive_jobs.append(job)
1148

    
1149
      old = self._GetJobPath(job.id)
1150
      new = self._GetArchivedJobPath(job.id)
1151
      rename_files.append((old, new))
1152

    
1153
    # TODO: What if 1..n files fail to rename?
1154
    self._RenameFilesUnlocked(rename_files)
1155

    
1156
    logging.debug("Successfully archived job(s) %s",
1157
                  ", ".join(job.id for job in archive_jobs))
1158

    
1159
    return len(archive_jobs)
1160

    
1161
  @utils.LockedMethod
1162
  @_RequireOpenQueue
1163
  def ArchiveJob(self, job_id):
1164
    """Archives a job.
1165

1166
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1167

1168
    @type job_id: string
1169
    @param job_id: Job ID of job to be archived.
1170
    @rtype: bool
1171
    @return: Whether job was archived
1172

1173
    """
1174
    logging.info("Archiving job %s", job_id)
1175

    
1176
    job = self._LoadJobUnlocked(job_id)
1177
    if not job:
1178
      logging.debug("Job %s not found", job_id)
1179
      return False
1180

    
1181
    return self._ArchiveJobsUnlocked([job]) == 1
1182

    
1183
  @utils.LockedMethod
1184
  @_RequireOpenQueue
1185
  def AutoArchiveJobs(self, age, timeout):
1186
    """Archives all jobs based on age.
1187

1188
    The method will archive all jobs which are older than the age
1189
    parameter. For jobs that don't have an end timestamp, the start
1190
    timestamp will be considered. The special '-1' age will cause
1191
    archival of all jobs (that are not running or queued).
1192

1193
    @type age: int
1194
    @param age: the minimum age in seconds
1195

1196
    """
1197
    logging.info("Archiving jobs with age more than %s seconds", age)
1198

    
1199
    now = time.time()
1200
    end_time = now + timeout
1201
    archived_count = 0
1202
    last_touched = 0
1203

    
1204
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1205
    pending = []
1206
    for idx, job_id in enumerate(all_job_ids):
1207
      last_touched = idx
1208

    
1209
      # Not optimal because jobs could be pending
1210
      # TODO: Measure average duration for job archival and take number of
1211
      # pending jobs into account.
1212
      if time.time() > end_time:
1213
        break
1214

    
1215
      # Returns None if the job failed to load
1216
      job = self._LoadJobUnlocked(job_id)
1217
      if job:
1218
        if job.end_timestamp is None:
1219
          if job.start_timestamp is None:
1220
            job_age = job.received_timestamp
1221
          else:
1222
            job_age = job.start_timestamp
1223
        else:
1224
          job_age = job.end_timestamp
1225

    
1226
        if age == -1 or now - job_age[0] > age:
1227
          pending.append(job)
1228

    
1229
          # Archive 10 jobs at a time
1230
          if len(pending) >= 10:
1231
            archived_count += self._ArchiveJobsUnlocked(pending)
1232
            pending = []
1233

    
1234
    if pending:
1235
      archived_count += self._ArchiveJobsUnlocked(pending)
1236

    
1237
    return (archived_count, len(all_job_ids) - last_touched - 1)
1238

    
1239
  def _GetJobInfoUnlocked(self, job, fields):
1240
    """Returns information about a job.
1241

1242
    @type job: L{_QueuedJob}
1243
    @param job: the job which we query
1244
    @type fields: list
1245
    @param fields: names of fields to return
1246
    @rtype: list
1247
    @return: list with one element for each field
1248
    @raise errors.OpExecError: when an invalid field
1249
        has been passed
1250

1251
    """
1252
    row = []
1253
    for fname in fields:
1254
      if fname == "id":
1255
        row.append(job.id)
1256
      elif fname == "status":
1257
        row.append(job.CalcStatus())
1258
      elif fname == "ops":
1259
        row.append([op.input.__getstate__() for op in job.ops])
1260
      elif fname == "opresult":
1261
        row.append([op.result for op in job.ops])
1262
      elif fname == "opstatus":
1263
        row.append([op.status for op in job.ops])
1264
      elif fname == "oplog":
1265
        row.append([op.log for op in job.ops])
1266
      elif fname == "opstart":
1267
        row.append([op.start_timestamp for op in job.ops])
1268
      elif fname == "opend":
1269
        row.append([op.end_timestamp for op in job.ops])
1270
      elif fname == "received_ts":
1271
        row.append(job.received_timestamp)
1272
      elif fname == "start_ts":
1273
        row.append(job.start_timestamp)
1274
      elif fname == "end_ts":
1275
        row.append(job.end_timestamp)
1276
      elif fname == "summary":
1277
        row.append([op.input.Summary() for op in job.ops])
1278
      else:
1279
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1280
    return row
1281

    
1282
  @utils.LockedMethod
1283
  @_RequireOpenQueue
1284
  def QueryJobs(self, job_ids, fields):
1285
    """Returns a list of jobs in queue.
1286

1287
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1288
    processing for each job.
1289

1290
    @type job_ids: list
1291
    @param job_ids: sequence of job identifiers or None for all
1292
    @type fields: list
1293
    @param fields: names of fields to return
1294
    @rtype: list
1295
    @return: list one element per job, each element being list with
1296
        the requested fields
1297

1298
    """
1299
    jobs = []
1300

    
1301
    for job in self._GetJobsUnlocked(job_ids):
1302
      if job is None:
1303
        jobs.append(None)
1304
      else:
1305
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1306

    
1307
    return jobs
1308

    
1309
  @utils.LockedMethod
1310
  @_RequireOpenQueue
1311
  def Shutdown(self):
1312
    """Stops the job queue.
1313

1314
    This shutdowns all the worker threads an closes the queue.
1315

1316
    """
1317
    self._wpool.TerminateWorkers()
1318

    
1319
    self._queue_lock.Close()
1320
    self._queue_lock = None