Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ c8457ce7

History | View | Annotate | Download (38.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encasulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  def __init__(self, op):
84
    """Constructor for the _QuededOpCode.
85

86
    @type op: L{opcodes.OpCode}
87
    @param op: the opcode we encapsulate
88

89
    """
90
    self.input = op
91
    self.status = constants.OP_STATUS_QUEUED
92
    self.result = None
93
    self.log = []
94
    self.start_timestamp = None
95
    self.end_timestamp = None
96

    
97
  @classmethod
98
  def Restore(cls, state):
99
    """Restore the _QueuedOpCode from the serialized form.
100

101
    @type state: dict
102
    @param state: the serialized state
103
    @rtype: _QueuedOpCode
104
    @return: a new _QueuedOpCode instance
105

106
    """
107
    obj = _QueuedOpCode.__new__(cls)
108
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109
    obj.status = state["status"]
110
    obj.result = state["result"]
111
    obj.log = state["log"]
112
    obj.start_timestamp = state.get("start_timestamp", None)
113
    obj.end_timestamp = state.get("end_timestamp", None)
114
    return obj
115

    
116
  def Serialize(self):
117
    """Serializes this _QueuedOpCode.
118

119
    @rtype: dict
120
    @return: the dictionary holding the serialized state
121

122
    """
123
    return {
124
      "input": self.input.__getstate__(),
125
      "status": self.status,
126
      "result": self.result,
127
      "log": self.log,
128
      "start_timestamp": self.start_timestamp,
129
      "end_timestamp": self.end_timestamp,
130
      }
131

    
132

    
133
class _QueuedJob(object):
134
  """In-memory job representation.
135

136
  This is what we use to track the user-submitted jobs. Locking must
137
  be taken care of by users of this class.
138

139
  @type queue: L{JobQueue}
140
  @ivar queue: the parent queue
141
  @ivar id: the job ID
142
  @type ops: list
143
  @ivar ops: the list of _QueuedOpCode that constitute the job
144
  @type run_op_index: int
145
  @ivar run_op_index: the currently executing opcode, or -1 if
146
      we didn't yet start executing
147
  @type log_serial: int
148
  @ivar log_serial: holds the index for the next log entry
149
  @ivar received_timestamp: the timestamp for when the job was received
150
  @ivar start_timestmap: the timestamp for start of execution
151
  @ivar end_timestamp: the timestamp for end of execution
152
  @ivar change: a Condition variable we use for waiting for job changes
153

154
  """
155
  def __init__(self, queue, job_id, ops):
156
    """Constructor for the _QueuedJob.
157

158
    @type queue: L{JobQueue}
159
    @param queue: our parent queue
160
    @type job_id: job_id
161
    @param job_id: our job id
162
    @type ops: list
163
    @param ops: the list of opcodes we hold, which will be encapsulated
164
        in _QueuedOpCodes
165

166
    """
167
    if not ops:
168
      # TODO: use a better exception
169
      raise Exception("No opcodes")
170

    
171
    self.queue = queue
172
    self.id = job_id
173
    self.ops = [_QueuedOpCode(op) for op in ops]
174
    self.run_op_index = -1
175
    self.log_serial = 0
176
    self.received_timestamp = TimeStampNow()
177
    self.start_timestamp = None
178
    self.end_timestamp = None
179

    
180
    # Condition to wait for changes
181
    self.change = threading.Condition(self.queue._lock)
182

    
183
  @classmethod
184
  def Restore(cls, queue, state):
185
    """Restore a _QueuedJob from serialized state:
186

187
    @type queue: L{JobQueue}
188
    @param queue: to which queue the restored job belongs
189
    @type state: dict
190
    @param state: the serialized state
191
    @rtype: _JobQueue
192
    @return: the restored _JobQueue instance
193

194
    """
195
    obj = _QueuedJob.__new__(cls)
196
    obj.queue = queue
197
    obj.id = state["id"]
198
    obj.run_op_index = state["run_op_index"]
199
    obj.received_timestamp = state.get("received_timestamp", None)
200
    obj.start_timestamp = state.get("start_timestamp", None)
201
    obj.end_timestamp = state.get("end_timestamp", None)
202

    
203
    obj.ops = []
204
    obj.log_serial = 0
205
    for op_state in state["ops"]:
206
      op = _QueuedOpCode.Restore(op_state)
207
      for log_entry in op.log:
208
        obj.log_serial = max(obj.log_serial, log_entry[0])
209
      obj.ops.append(op)
210

    
211
    # Condition to wait for changes
212
    obj.change = threading.Condition(obj.queue._lock)
213

    
214
    return obj
215

    
216
  def Serialize(self):
217
    """Serialize the _JobQueue instance.
218

219
    @rtype: dict
220
    @return: the serialized state
221

222
    """
223
    return {
224
      "id": self.id,
225
      "ops": [op.Serialize() for op in self.ops],
226
      "run_op_index": self.run_op_index,
227
      "start_timestamp": self.start_timestamp,
228
      "end_timestamp": self.end_timestamp,
229
      "received_timestamp": self.received_timestamp,
230
      }
231

    
232
  def CalcStatus(self):
233
    """Compute the status of this job.
234

235
    This function iterates over all the _QueuedOpCodes in the job and
236
    based on their status, computes the job status.
237

238
    The algorithm is:
239
      - if we find a cancelled, or finished with error, the job
240
        status will be the same
241
      - otherwise, the last opcode with the status one of:
242
          - waitlock
243
          - canceling
244
          - running
245

246
        will determine the job status
247

248
      - otherwise, it means either all opcodes are queued, or success,
249
        and the job status will be the same
250

251
    @return: the job status
252

253
    """
254
    status = constants.JOB_STATUS_QUEUED
255

    
256
    all_success = True
257
    for op in self.ops:
258
      if op.status == constants.OP_STATUS_SUCCESS:
259
        continue
260

    
261
      all_success = False
262

    
263
      if op.status == constants.OP_STATUS_QUEUED:
264
        pass
265
      elif op.status == constants.OP_STATUS_WAITLOCK:
266
        status = constants.JOB_STATUS_WAITLOCK
267
      elif op.status == constants.OP_STATUS_RUNNING:
268
        status = constants.JOB_STATUS_RUNNING
269
      elif op.status == constants.OP_STATUS_CANCELING:
270
        status = constants.JOB_STATUS_CANCELING
271
        break
272
      elif op.status == constants.OP_STATUS_ERROR:
273
        status = constants.JOB_STATUS_ERROR
274
        # The whole job fails if one opcode failed
275
        break
276
      elif op.status == constants.OP_STATUS_CANCELED:
277
        status = constants.OP_STATUS_CANCELED
278
        break
279

    
280
    if all_success:
281
      status = constants.JOB_STATUS_SUCCESS
282

    
283
    return status
284

    
285
  def GetLogEntries(self, newer_than):
286
    """Selectively returns the log entries.
287

288
    @type newer_than: None or int
289
    @param newer_than: if this is None, return all log enties,
290
        otherwise return only the log entries with serial higher
291
        than this value
292
    @rtype: list
293
    @return: the list of the log entries selected
294

295
    """
296
    if newer_than is None:
297
      serial = -1
298
    else:
299
      serial = newer_than
300

    
301
    entries = []
302
    for op in self.ops:
303
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
304

    
305
    return entries
306

    
307

    
308
class _JobQueueWorker(workerpool.BaseWorker):
309
  """The actual job workers.
310

311
  """
312
  def _NotifyStart(self):
313
    """Mark the opcode as running, not lock-waiting.
314

315
    This is called from the mcpu code as a notifier function, when the
316
    LU is finally about to start the Exec() method. Of course, to have
317
    end-user visible results, the opcode must be initially (before
318
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
319

320
    """
321
    assert self.queue, "Queue attribute is missing"
322
    assert self.opcode, "Opcode attribute is missing"
323

    
324
    self.queue.acquire()
325
    try:
326
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327
                                    constants.OP_STATUS_CANCELING)
328

    
329
      # Cancel here if we were asked to
330
      if self.opcode.status == constants.OP_STATUS_CANCELING:
331
        raise CancelJob()
332

    
333
      self.opcode.status = constants.OP_STATUS_RUNNING
334
    finally:
335
      self.queue.release()
336

    
337
  def RunTask(self, job):
338
    """Job executor.
339

340
    This functions processes a job. It is closely tied to the _QueuedJob and
341
    _QueuedOpCode classes.
342

343
    @type job: L{_QueuedJob}
344
    @param job: the job to be processed
345

346
    """
347
    logging.info("Worker %s processing job %s",
348
                  self.worker_id, job.id)
349
    proc = mcpu.Processor(self.pool.queue.context)
350
    self.queue = queue = job.queue
351
    try:
352
      try:
353
        count = len(job.ops)
354
        for idx, op in enumerate(job.ops):
355
          op_summary = op.input.Summary()
356
          try:
357
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
358
                         op_summary)
359

    
360
            queue.acquire()
361
            try:
362
              if op.status == constants.OP_STATUS_CANCELED:
363
                raise CancelJob()
364
              assert op.status == constants.OP_STATUS_QUEUED
365
              job.run_op_index = idx
366
              op.status = constants.OP_STATUS_WAITLOCK
367
              op.result = None
368
              op.start_timestamp = TimeStampNow()
369
              if idx == 0: # first opcode
370
                job.start_timestamp = op.start_timestamp
371
              queue.UpdateJobUnlocked(job)
372

    
373
              input_opcode = op.input
374
            finally:
375
              queue.release()
376

    
377
            def _Log(*args):
378
              """Append a log entry.
379

380
              """
381
              assert len(args) < 3
382

    
383
              if len(args) == 1:
384
                log_type = constants.ELOG_MESSAGE
385
                log_msg = args[0]
386
              else:
387
                log_type, log_msg = args
388

    
389
              # The time is split to make serialization easier and not lose
390
              # precision.
391
              timestamp = utils.SplitTime(time.time())
392

    
393
              queue.acquire()
394
              try:
395
                job.log_serial += 1
396
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
397

    
398
                job.change.notifyAll()
399
              finally:
400
                queue.release()
401

    
402
            # Make sure not to hold lock while _Log is called
403
            self.opcode = op
404
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
405

    
406
            queue.acquire()
407
            try:
408
              op.status = constants.OP_STATUS_SUCCESS
409
              op.result = result
410
              op.end_timestamp = TimeStampNow()
411
              queue.UpdateJobUnlocked(job)
412
            finally:
413
              queue.release()
414

    
415
            logging.info("Op %s/%s: Successfully finished opcode %s",
416
                         idx + 1, count, op_summary)
417
          except CancelJob:
418
            # Will be handled further up
419
            raise
420
          except Exception, err:
421
            queue.acquire()
422
            try:
423
              try:
424
                op.status = constants.OP_STATUS_ERROR
425
                op.result = str(err)
426
                op.end_timestamp = TimeStampNow()
427
                logging.info("Op %s/%s: Error in opcode %s: %s",
428
                             idx + 1, count, op_summary, err)
429
              finally:
430
                queue.UpdateJobUnlocked(job)
431
            finally:
432
              queue.release()
433
            raise
434

    
435
      except CancelJob:
436
        queue.acquire()
437
        try:
438
          queue.CancelJobUnlocked(job)
439
        finally:
440
          queue.release()
441
      except errors.GenericError, err:
442
        logging.exception("Ganeti exception")
443
      except:
444
        logging.exception("Unhandled exception")
445
    finally:
446
      queue.acquire()
447
      try:
448
        try:
449
          job.run_op_idx = -1
450
          job.end_timestamp = TimeStampNow()
451
          queue.UpdateJobUnlocked(job)
452
        finally:
453
          job_id = job.id
454
          status = job.CalcStatus()
455
      finally:
456
        queue.release()
457
      logging.info("Worker %s finished job %s, status = %s",
458
                   self.worker_id, job_id, status)
459

    
460

    
461
class _JobQueueWorkerPool(workerpool.WorkerPool):
462
  """Simple class implementing a job-processing workerpool.
463

464
  """
465
  def __init__(self, queue):
466
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
467
                                              _JobQueueWorker)
468
    self.queue = queue
469

    
470

    
471
class JobQueue(object):
472
  """Quue used to manaage the jobs.
473

474
  @cvar _RE_JOB_FILE: regex matching the valid job file names
475

476
  """
477
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
478

    
479
  def _RequireOpenQueue(fn):
480
    """Decorator for "public" functions.
481

482
    This function should be used for all 'public' functions. That is,
483
    functions usually called from other classes.
484

485
    @warning: Use this decorator only after utils.LockedMethod!
486

487
    Example::
488
      @utils.LockedMethod
489
      @_RequireOpenQueue
490
      def Example(self):
491
        pass
492

493
    """
494
    def wrapper(self, *args, **kwargs):
495
      assert self._queue_lock is not None, "Queue should be open"
496
      return fn(self, *args, **kwargs)
497
    return wrapper
498

    
499
  def __init__(self, context):
500
    """Constructor for JobQueue.
501

502
    The constructor will initialize the job queue object and then
503
    start loading the current jobs from disk, either for starting them
504
    (if they were queue) or for aborting them (if they were already
505
    running).
506

507
    @type context: GanetiContext
508
    @param context: the context object for access to the configuration
509
        data and other ganeti objects
510

511
    """
512
    self.context = context
513
    self._memcache = weakref.WeakValueDictionary()
514
    self._my_hostname = utils.HostInfo().name
515

    
516
    # Locking
517
    self._lock = threading.Lock()
518
    self.acquire = self._lock.acquire
519
    self.release = self._lock.release
520

    
521
    # Initialize
522
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
523

    
524
    # Read serial file
525
    self._last_serial = jstore.ReadSerial()
526
    assert self._last_serial is not None, ("Serial file was modified between"
527
                                           " check in jstore and here")
528

    
529
    # Get initial list of nodes
530
    self._nodes = dict((n.name, n.primary_ip)
531
                       for n in self.context.cfg.GetAllNodesInfo().values()
532
                       if n.master_candidate)
533

    
534
    # Remove master node
535
    try:
536
      del self._nodes[self._my_hostname]
537
    except KeyError:
538
      pass
539

    
540
    # TODO: Check consistency across nodes
541

    
542
    # Setup worker pool
543
    self._wpool = _JobQueueWorkerPool(self)
544
    try:
545
      # We need to lock here because WorkerPool.AddTask() may start a job while
546
      # we're still doing our work.
547
      self.acquire()
548
      try:
549
        logging.info("Inspecting job queue")
550

    
551
        all_job_ids = self._GetJobIDsUnlocked()
552
        jobs_count = len(all_job_ids)
553
        lastinfo = time.time()
554
        for idx, job_id in enumerate(all_job_ids):
555
          # Give an update every 1000 jobs or 10 seconds
556
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
557
              idx == (jobs_count - 1)):
558
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
559
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
560
            lastinfo = time.time()
561

    
562
          job = self._LoadJobUnlocked(job_id)
563

    
564
          # a failure in loading the job can cause 'None' to be returned
565
          if job is None:
566
            continue
567

    
568
          status = job.CalcStatus()
569

    
570
          if status in (constants.JOB_STATUS_QUEUED, ):
571
            self._wpool.AddTask(job)
572

    
573
          elif status in (constants.JOB_STATUS_RUNNING,
574
                          constants.JOB_STATUS_WAITLOCK,
575
                          constants.JOB_STATUS_CANCELING):
576
            logging.warning("Unfinished job %s found: %s", job.id, job)
577
            try:
578
              for op in job.ops:
579
                op.status = constants.OP_STATUS_ERROR
580
                op.result = "Unclean master daemon shutdown"
581
            finally:
582
              self.UpdateJobUnlocked(job)
583

    
584
        logging.info("Job queue inspection finished")
585
      finally:
586
        self.release()
587
    except:
588
      self._wpool.TerminateWorkers()
589
      raise
590

    
591
  @utils.LockedMethod
592
  @_RequireOpenQueue
593
  def AddNode(self, node):
594
    """Register a new node with the queue.
595

596
    @type node: L{objects.Node}
597
    @param node: the node object to be added
598

599
    """
600
    node_name = node.name
601
    assert node_name != self._my_hostname
602

    
603
    # Clean queue directory on added node
604
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
605
    msg = result.RemoteFailMsg()
606
    if msg:
607
      logging.warning("Cannot cleanup queue directory on node %s: %s",
608
                      node_name, msg)
609

    
610
    if not node.master_candidate:
611
      # remove if existing, ignoring errors
612
      self._nodes.pop(node_name, None)
613
      # and skip the replication of the job ids
614
      return
615

    
616
    # Upload the whole queue excluding archived jobs
617
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
618

    
619
    # Upload current serial file
620
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
621

    
622
    for file_name in files:
623
      # Read file content
624
      fd = open(file_name, "r")
625
      try:
626
        content = fd.read()
627
      finally:
628
        fd.close()
629

    
630
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
631
                                                  [node.primary_ip],
632
                                                  file_name, content)
633
      msg = result[node_name].RemoteFailMsg()
634
      if msg:
635
        logging.error("Failed to upload file %s to node %s: %s",
636
                      file_name, node_name, msg)
637

    
638
    self._nodes[node_name] = node.primary_ip
639

    
640
  @utils.LockedMethod
641
  @_RequireOpenQueue
642
  def RemoveNode(self, node_name):
643
    """Callback called when removing nodes from the cluster.
644

645
    @type node_name: str
646
    @param node_name: the name of the node to remove
647

648
    """
649
    try:
650
      # The queue is removed by the "leave node" RPC call.
651
      del self._nodes[node_name]
652
    except KeyError:
653
      pass
654

    
655
  def _CheckRpcResult(self, result, nodes, failmsg):
656
    """Verifies the status of an RPC call.
657

658
    Since we aim to keep consistency should this node (the current
659
    master) fail, we will log errors if our rpc fail, and especially
660
    log the case when more than half of the nodes failes.
661

662
    @param result: the data as returned from the rpc call
663
    @type nodes: list
664
    @param nodes: the list of nodes we made the call to
665
    @type failmsg: str
666
    @param failmsg: the identifier to be used for logging
667

668
    """
669
    failed = []
670
    success = []
671

    
672
    for node in nodes:
673
      msg = result[node].RemoteFailMsg()
674
      if msg:
675
        failed.append(node)
676
        logging.error("RPC call %s failed on node %s: %s",
677
                      result[node].call, node, msg)
678
      else:
679
        success.append(node)
680

    
681
    # +1 for the master node
682
    if (len(success) + 1) < len(failed):
683
      # TODO: Handle failing nodes
684
      logging.error("More than half of the nodes failed")
685

    
686
  def _GetNodeIp(self):
687
    """Helper for returning the node name/ip list.
688

689
    @rtype: (list, list)
690
    @return: a tuple of two lists, the first one with the node
691
        names and the second one with the node addresses
692

693
    """
694
    name_list = self._nodes.keys()
695
    addr_list = [self._nodes[name] for name in name_list]
696
    return name_list, addr_list
697

    
698
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
699
    """Writes a file locally and then replicates it to all nodes.
700

701
    This function will replace the contents of a file on the local
702
    node and then replicate it to all the other nodes we have.
703

704
    @type file_name: str
705
    @param file_name: the path of the file to be replicated
706
    @type data: str
707
    @param data: the new contents of the file
708

709
    """
710
    utils.WriteFile(file_name, data=data)
711

    
712
    names, addrs = self._GetNodeIp()
713
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
714
    self._CheckRpcResult(result, self._nodes,
715
                         "Updating %s" % file_name)
716

    
717
  def _RenameFilesUnlocked(self, rename):
718
    """Renames a file locally and then replicate the change.
719

720
    This function will rename a file in the local queue directory
721
    and then replicate this rename to all the other nodes we have.
722

723
    @type rename: list of (old, new)
724
    @param rename: List containing tuples mapping old to new names
725

726
    """
727
    # Rename them locally
728
    for old, new in rename:
729
      utils.RenameFile(old, new, mkdir=True)
730

    
731
    # ... and on all nodes
732
    names, addrs = self._GetNodeIp()
733
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
734
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
735

    
736
  def _FormatJobID(self, job_id):
737
    """Convert a job ID to string format.
738

739
    Currently this just does C{str(job_id)} after performing some
740
    checks, but if we want to change the job id format this will
741
    abstract this change.
742

743
    @type job_id: int or long
744
    @param job_id: the numeric job id
745
    @rtype: str
746
    @return: the formatted job id
747

748
    """
749
    if not isinstance(job_id, (int, long)):
750
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
751
    if job_id < 0:
752
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
753

    
754
    return str(job_id)
755

    
756
  @classmethod
757
  def _GetArchiveDirectory(cls, job_id):
758
    """Returns the archive directory for a job.
759

760
    @type job_id: str
761
    @param job_id: Job identifier
762
    @rtype: str
763
    @return: Directory name
764

765
    """
766
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
767

    
768
  def _NewSerialUnlocked(self):
769
    """Generates a new job identifier.
770

771
    Job identifiers are unique during the lifetime of a cluster.
772

773
    @rtype: str
774
    @return: a string representing the job identifier.
775

776
    """
777
    # New number
778
    serial = self._last_serial + 1
779

    
780
    # Write to file
781
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
782
                                        "%s\n" % serial)
783

    
784
    # Keep it only if we were able to write the file
785
    self._last_serial = serial
786

    
787
    return self._FormatJobID(serial)
788

    
789
  @staticmethod
790
  def _GetJobPath(job_id):
791
    """Returns the job file for a given job id.
792

793
    @type job_id: str
794
    @param job_id: the job identifier
795
    @rtype: str
796
    @return: the path to the job file
797

798
    """
799
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
800

    
801
  @classmethod
802
  def _GetArchivedJobPath(cls, job_id):
803
    """Returns the archived job file for a give job id.
804

805
    @type job_id: str
806
    @param job_id: the job identifier
807
    @rtype: str
808
    @return: the path to the archived job file
809

810
    """
811
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
812
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
813

    
814
  @classmethod
815
  def _ExtractJobID(cls, name):
816
    """Extract the job id from a filename.
817

818
    @type name: str
819
    @param name: the job filename
820
    @rtype: job id or None
821
    @return: the job id corresponding to the given filename,
822
        or None if the filename does not represent a valid
823
        job file
824

825
    """
826
    m = cls._RE_JOB_FILE.match(name)
827
    if m:
828
      return m.group(1)
829
    else:
830
      return None
831

    
832
  def _GetJobIDsUnlocked(self, archived=False):
833
    """Return all known job IDs.
834

835
    If the parameter archived is True, archived jobs IDs will be
836
    included. Currently this argument is unused.
837

838
    The method only looks at disk because it's a requirement that all
839
    jobs are present on disk (so in the _memcache we don't have any
840
    extra IDs).
841

842
    @rtype: list
843
    @return: the list of job IDs
844

845
    """
846
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
847
    jlist = utils.NiceSort(jlist)
848
    return jlist
849

    
850
  def _ListJobFiles(self):
851
    """Returns the list of current job files.
852

853
    @rtype: list
854
    @return: the list of job file names
855

856
    """
857
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
858
            if self._RE_JOB_FILE.match(name)]
859

    
860
  def _LoadJobUnlocked(self, job_id):
861
    """Loads a job from the disk or memory.
862

863
    Given a job id, this will return the cached job object if
864
    existing, or try to load the job from the disk. If loading from
865
    disk, it will also add the job to the cache.
866

867
    @param job_id: the job id
868
    @rtype: L{_QueuedJob} or None
869
    @return: either None or the job object
870

871
    """
872
    job = self._memcache.get(job_id, None)
873
    if job:
874
      logging.debug("Found job %s in memcache", job_id)
875
      return job
876

    
877
    filepath = self._GetJobPath(job_id)
878
    logging.debug("Loading job from %s", filepath)
879
    try:
880
      fd = open(filepath, "r")
881
    except IOError, err:
882
      if err.errno in (errno.ENOENT, ):
883
        return None
884
      raise
885
    try:
886
      data = serializer.LoadJson(fd.read())
887
    finally:
888
      fd.close()
889

    
890
    try:
891
      job = _QueuedJob.Restore(self, data)
892
    except Exception, err:
893
      new_path = self._GetArchivedJobPath(job_id)
894
      if filepath == new_path:
895
        # job already archived (future case)
896
        logging.exception("Can't parse job %s", job_id)
897
      else:
898
        # non-archived case
899
        logging.exception("Can't parse job %s, will archive.", job_id)
900
        self._RenameFilesUnlocked([(filepath, new_path)])
901
      return None
902

    
903
    self._memcache[job_id] = job
904
    logging.debug("Added job %s to the cache", job_id)
905
    return job
906

    
907
  def _GetJobsUnlocked(self, job_ids):
908
    """Return a list of jobs based on their IDs.
909

910
    @type job_ids: list
911
    @param job_ids: either an empty list (meaning all jobs),
912
        or a list of job IDs
913
    @rtype: list
914
    @return: the list of job objects
915

916
    """
917
    if not job_ids:
918
      job_ids = self._GetJobIDsUnlocked()
919

    
920
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
921

    
922
  @staticmethod
923
  def _IsQueueMarkedDrain():
924
    """Check if the queue is marked from drain.
925

926
    This currently uses the queue drain file, which makes it a
927
    per-node flag. In the future this can be moved to the config file.
928

929
    @rtype: boolean
930
    @return: True of the job queue is marked for draining
931

932
    """
933
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
934

    
935
  @staticmethod
936
  def SetDrainFlag(drain_flag):
937
    """Sets the drain flag for the queue.
938

939
    This is similar to the function L{backend.JobQueueSetDrainFlag},
940
    and in the future we might merge them.
941

942
    @type drain_flag: boolean
943
    @param drain_flag: wheter to set or unset the drain flag
944

945
    """
946
    if drain_flag:
947
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
948
    else:
949
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
950
    return True
951

    
952
  @_RequireOpenQueue
953
  def _SubmitJobUnlocked(self, ops):
954
    """Create and store a new job.
955

956
    This enters the job into our job queue and also puts it on the new
957
    queue, in order for it to be picked up by the queue processors.
958

959
    @type ops: list
960
    @param ops: The list of OpCodes that will become the new job.
961
    @rtype: job ID
962
    @return: the job ID of the newly created job
963
    @raise errors.JobQueueDrainError: if the job is marked for draining
964

965
    """
966
    if self._IsQueueMarkedDrain():
967
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
968

    
969
    # Check job queue size
970
    size = len(self._ListJobFiles())
971
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
972
      # TODO: Autoarchive jobs. Make sure it's not done on every job
973
      # submission, though.
974
      #size = ...
975
      pass
976

    
977
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
978
      raise errors.JobQueueFull()
979

    
980
    # Get job identifier
981
    job_id = self._NewSerialUnlocked()
982
    job = _QueuedJob(self, job_id, ops)
983

    
984
    # Write to disk
985
    self.UpdateJobUnlocked(job)
986

    
987
    logging.debug("Adding new job %s to the cache", job_id)
988
    self._memcache[job_id] = job
989

    
990
    # Add to worker pool
991
    self._wpool.AddTask(job)
992

    
993
    return job.id
994

    
995
  @utils.LockedMethod
996
  @_RequireOpenQueue
997
  def SubmitJob(self, ops):
998
    """Create and store a new job.
999

1000
    @see: L{_SubmitJobUnlocked}
1001

1002
    """
1003
    return self._SubmitJobUnlocked(ops)
1004

    
1005
  @utils.LockedMethod
1006
  @_RequireOpenQueue
1007
  def SubmitManyJobs(self, jobs):
1008
    """Create and store multiple jobs.
1009

1010
    @see: L{_SubmitJobUnlocked}
1011

1012
    """
1013
    results = []
1014
    for ops in jobs:
1015
      try:
1016
        data = self._SubmitJobUnlocked(ops)
1017
        status = True
1018
      except errors.GenericError, err:
1019
        data = str(err)
1020
        status = False
1021
      results.append((status, data))
1022

    
1023
    return results
1024

    
1025

    
1026
  @_RequireOpenQueue
1027
  def UpdateJobUnlocked(self, job):
1028
    """Update a job's on disk storage.
1029

1030
    After a job has been modified, this function needs to be called in
1031
    order to write the changes to disk and replicate them to the other
1032
    nodes.
1033

1034
    @type job: L{_QueuedJob}
1035
    @param job: the changed job
1036

1037
    """
1038
    filename = self._GetJobPath(job.id)
1039
    data = serializer.DumpJson(job.Serialize(), indent=False)
1040
    logging.debug("Writing job %s to %s", job.id, filename)
1041
    self._WriteAndReplicateFileUnlocked(filename, data)
1042

    
1043
    # Notify waiters about potential changes
1044
    job.change.notifyAll()
1045

    
1046
  @utils.LockedMethod
1047
  @_RequireOpenQueue
1048
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1049
                        timeout):
1050
    """Waits for changes in a job.
1051

1052
    @type job_id: string
1053
    @param job_id: Job identifier
1054
    @type fields: list of strings
1055
    @param fields: Which fields to check for changes
1056
    @type prev_job_info: list or None
1057
    @param prev_job_info: Last job information returned
1058
    @type prev_log_serial: int
1059
    @param prev_log_serial: Last job message serial number
1060
    @type timeout: float
1061
    @param timeout: maximum time to wait
1062
    @rtype: tuple (job info, log entries)
1063
    @return: a tuple of the job information as required via
1064
        the fields parameter, and the log entries as a list
1065

1066
        if the job has not changed and the timeout has expired,
1067
        we instead return a special value,
1068
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1069
        as such by the clients
1070

1071
    """
1072
    logging.debug("Waiting for changes in job %s", job_id)
1073
    end_time = time.time() + timeout
1074
    while True:
1075
      delta_time = end_time - time.time()
1076
      if delta_time < 0:
1077
        return constants.JOB_NOTCHANGED
1078

    
1079
      job = self._LoadJobUnlocked(job_id)
1080
      if not job:
1081
        logging.debug("Job %s not found", job_id)
1082
        break
1083

    
1084
      status = job.CalcStatus()
1085
      job_info = self._GetJobInfoUnlocked(job, fields)
1086
      log_entries = job.GetLogEntries(prev_log_serial)
1087

    
1088
      # Serializing and deserializing data can cause type changes (e.g. from
1089
      # tuple to list) or precision loss. We're doing it here so that we get
1090
      # the same modifications as the data received from the client. Without
1091
      # this, the comparison afterwards might fail without the data being
1092
      # significantly different.
1093
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1094
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1095

    
1096
      if status not in (constants.JOB_STATUS_QUEUED,
1097
                        constants.JOB_STATUS_RUNNING,
1098
                        constants.JOB_STATUS_WAITLOCK):
1099
        # Don't even try to wait if the job is no longer running, there will be
1100
        # no changes.
1101
        break
1102

    
1103
      if (prev_job_info != job_info or
1104
          (log_entries and prev_log_serial != log_entries[0][0])):
1105
        break
1106

    
1107
      logging.debug("Waiting again")
1108

    
1109
      # Release the queue lock while waiting
1110
      job.change.wait(delta_time)
1111

    
1112
    logging.debug("Job %s changed", job_id)
1113

    
1114
    return (job_info, log_entries)
1115

    
1116
  @utils.LockedMethod
1117
  @_RequireOpenQueue
1118
  def CancelJob(self, job_id):
1119
    """Cancels a job.
1120

1121
    This will only succeed if the job has not started yet.
1122

1123
    @type job_id: string
1124
    @param job_id: job ID of job to be cancelled.
1125

1126
    """
1127
    logging.info("Cancelling job %s", job_id)
1128

    
1129
    job = self._LoadJobUnlocked(job_id)
1130
    if not job:
1131
      logging.debug("Job %s not found", job_id)
1132
      return (False, "Job %s not found" % job_id)
1133

    
1134
    job_status = job.CalcStatus()
1135

    
1136
    if job_status not in (constants.JOB_STATUS_QUEUED,
1137
                          constants.JOB_STATUS_WAITLOCK):
1138
      logging.debug("Job %s is no longer in the queue", job.id)
1139
      return (False, "Job %s is no longer in the queue" % job.id)
1140

    
1141
    if job_status == constants.JOB_STATUS_QUEUED:
1142
      self.CancelJobUnlocked(job)
1143
      return (True, "Job %s canceled" % job.id)
1144

    
1145
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1146
      # The worker will notice the new status and cancel the job
1147
      try:
1148
        for op in job.ops:
1149
          op.status = constants.OP_STATUS_CANCELING
1150
      finally:
1151
        self.UpdateJobUnlocked(job)
1152
      return (True, "Job %s will be canceled" % job.id)
1153

    
1154
  @_RequireOpenQueue
1155
  def CancelJobUnlocked(self, job):
1156
    """Marks a job as canceled.
1157

1158
    """
1159
    try:
1160
      for op in job.ops:
1161
        op.status = constants.OP_STATUS_CANCELED
1162
        op.result = "Job canceled by request"
1163
    finally:
1164
      self.UpdateJobUnlocked(job)
1165

    
1166
  @_RequireOpenQueue
1167
  def _ArchiveJobsUnlocked(self, jobs):
1168
    """Archives jobs.
1169

1170
    @type jobs: list of L{_QueuedJob}
1171
    @param jobs: Job objects
1172
    @rtype: int
1173
    @return: Number of archived jobs
1174

1175
    """
1176
    archive_jobs = []
1177
    rename_files = []
1178
    for job in jobs:
1179
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1180
                                  constants.JOB_STATUS_SUCCESS,
1181
                                  constants.JOB_STATUS_ERROR):
1182
        logging.debug("Job %s is not yet done", job.id)
1183
        continue
1184

    
1185
      archive_jobs.append(job)
1186

    
1187
      old = self._GetJobPath(job.id)
1188
      new = self._GetArchivedJobPath(job.id)
1189
      rename_files.append((old, new))
1190

    
1191
    # TODO: What if 1..n files fail to rename?
1192
    self._RenameFilesUnlocked(rename_files)
1193

    
1194
    logging.debug("Successfully archived job(s) %s",
1195
                  ", ".join(job.id for job in archive_jobs))
1196

    
1197
    return len(archive_jobs)
1198

    
1199
  @utils.LockedMethod
1200
  @_RequireOpenQueue
1201
  def ArchiveJob(self, job_id):
1202
    """Archives a job.
1203

1204
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1205

1206
    @type job_id: string
1207
    @param job_id: Job ID of job to be archived.
1208
    @rtype: bool
1209
    @return: Whether job was archived
1210

1211
    """
1212
    logging.info("Archiving job %s", job_id)
1213

    
1214
    job = self._LoadJobUnlocked(job_id)
1215
    if not job:
1216
      logging.debug("Job %s not found", job_id)
1217
      return False
1218

    
1219
    return self._ArchiveJobsUnlocked([job]) == 1
1220

    
1221
  @utils.LockedMethod
1222
  @_RequireOpenQueue
1223
  def AutoArchiveJobs(self, age, timeout):
1224
    """Archives all jobs based on age.
1225

1226
    The method will archive all jobs which are older than the age
1227
    parameter. For jobs that don't have an end timestamp, the start
1228
    timestamp will be considered. The special '-1' age will cause
1229
    archival of all jobs (that are not running or queued).
1230

1231
    @type age: int
1232
    @param age: the minimum age in seconds
1233

1234
    """
1235
    logging.info("Archiving jobs with age more than %s seconds", age)
1236

    
1237
    now = time.time()
1238
    end_time = now + timeout
1239
    archived_count = 0
1240
    last_touched = 0
1241

    
1242
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1243
    pending = []
1244
    for idx, job_id in enumerate(all_job_ids):
1245
      last_touched = idx
1246

    
1247
      # Not optimal because jobs could be pending
1248
      # TODO: Measure average duration for job archival and take number of
1249
      # pending jobs into account.
1250
      if time.time() > end_time:
1251
        break
1252

    
1253
      # Returns None if the job failed to load
1254
      job = self._LoadJobUnlocked(job_id)
1255
      if job:
1256
        if job.end_timestamp is None:
1257
          if job.start_timestamp is None:
1258
            job_age = job.received_timestamp
1259
          else:
1260
            job_age = job.start_timestamp
1261
        else:
1262
          job_age = job.end_timestamp
1263

    
1264
        if age == -1 or now - job_age[0] > age:
1265
          pending.append(job)
1266

    
1267
          # Archive 10 jobs at a time
1268
          if len(pending) >= 10:
1269
            archived_count += self._ArchiveJobsUnlocked(pending)
1270
            pending = []
1271

    
1272
    if pending:
1273
      archived_count += self._ArchiveJobsUnlocked(pending)
1274

    
1275
    return (archived_count, len(all_job_ids) - last_touched - 1)
1276

    
1277
  def _GetJobInfoUnlocked(self, job, fields):
1278
    """Returns information about a job.
1279

1280
    @type job: L{_QueuedJob}
1281
    @param job: the job which we query
1282
    @type fields: list
1283
    @param fields: names of fields to return
1284
    @rtype: list
1285
    @return: list with one element for each field
1286
    @raise errors.OpExecError: when an invalid field
1287
        has been passed
1288

1289
    """
1290
    row = []
1291
    for fname in fields:
1292
      if fname == "id":
1293
        row.append(job.id)
1294
      elif fname == "status":
1295
        row.append(job.CalcStatus())
1296
      elif fname == "ops":
1297
        row.append([op.input.__getstate__() for op in job.ops])
1298
      elif fname == "opresult":
1299
        row.append([op.result for op in job.ops])
1300
      elif fname == "opstatus":
1301
        row.append([op.status for op in job.ops])
1302
      elif fname == "oplog":
1303
        row.append([op.log for op in job.ops])
1304
      elif fname == "opstart":
1305
        row.append([op.start_timestamp for op in job.ops])
1306
      elif fname == "opend":
1307
        row.append([op.end_timestamp for op in job.ops])
1308
      elif fname == "received_ts":
1309
        row.append(job.received_timestamp)
1310
      elif fname == "start_ts":
1311
        row.append(job.start_timestamp)
1312
      elif fname == "end_ts":
1313
        row.append(job.end_timestamp)
1314
      elif fname == "summary":
1315
        row.append([op.input.Summary() for op in job.ops])
1316
      else:
1317
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1318
    return row
1319

    
1320
  @utils.LockedMethod
1321
  @_RequireOpenQueue
1322
  def QueryJobs(self, job_ids, fields):
1323
    """Returns a list of jobs in queue.
1324

1325
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1326
    processing for each job.
1327

1328
    @type job_ids: list
1329
    @param job_ids: sequence of job identifiers or None for all
1330
    @type fields: list
1331
    @param fields: names of fields to return
1332
    @rtype: list
1333
    @return: list one element per job, each element being list with
1334
        the requested fields
1335

1336
    """
1337
    jobs = []
1338

    
1339
    for job in self._GetJobsUnlocked(job_ids):
1340
      if job is None:
1341
        jobs.append(None)
1342
      else:
1343
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1344

    
1345
    return jobs
1346

    
1347
  @utils.LockedMethod
1348
  @_RequireOpenQueue
1349
  def Shutdown(self):
1350
    """Stops the job queue.
1351

1352
    This shutdowns all the worker threads an closes the queue.
1353

1354
    """
1355
    self._wpool.TerminateWorkers()
1356

    
1357
    self._queue_lock.Close()
1358
    self._queue_lock = None