Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 66d895a8

History | View | Annotate | Download (38.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  __slots__ = ["input", "status", "result", "log",
84
               "start_timestamp", "end_timestamp",
85
               "__weakref__"]
86

    
87
  def __init__(self, op):
88
    """Constructor for the _QuededOpCode.
89

90
    @type op: L{opcodes.OpCode}
91
    @param op: the opcode we encapsulate
92

93
    """
94
    self.input = op
95
    self.status = constants.OP_STATUS_QUEUED
96
    self.result = None
97
    self.log = []
98
    self.start_timestamp = None
99
    self.end_timestamp = None
100

    
101
  @classmethod
102
  def Restore(cls, state):
103
    """Restore the _QueuedOpCode from the serialized form.
104

105
    @type state: dict
106
    @param state: the serialized state
107
    @rtype: _QueuedOpCode
108
    @return: a new _QueuedOpCode instance
109

110
    """
111
    obj = _QueuedOpCode.__new__(cls)
112
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113
    obj.status = state["status"]
114
    obj.result = state["result"]
115
    obj.log = state["log"]
116
    obj.start_timestamp = state.get("start_timestamp", None)
117
    obj.end_timestamp = state.get("end_timestamp", None)
118
    return obj
119

    
120
  def Serialize(self):
121
    """Serializes this _QueuedOpCode.
122

123
    @rtype: dict
124
    @return: the dictionary holding the serialized state
125

126
    """
127
    return {
128
      "input": self.input.__getstate__(),
129
      "status": self.status,
130
      "result": self.result,
131
      "log": self.log,
132
      "start_timestamp": self.start_timestamp,
133
      "end_timestamp": self.end_timestamp,
134
      }
135

    
136

    
137
class _QueuedJob(object):
138
  """In-memory job representation.
139

140
  This is what we use to track the user-submitted jobs. Locking must
141
  be taken care of by users of this class.
142

143
  @type queue: L{JobQueue}
144
  @ivar queue: the parent queue
145
  @ivar id: the job ID
146
  @type ops: list
147
  @ivar ops: the list of _QueuedOpCode that constitute the job
148
  @type run_op_index: int
149
  @ivar run_op_index: the currently executing opcode, or -1 if
150
      we didn't yet start executing
151
  @type log_serial: int
152
  @ivar log_serial: holds the index for the next log entry
153
  @ivar received_timestamp: the timestamp for when the job was received
154
  @ivar start_timestmap: the timestamp for start of execution
155
  @ivar end_timestamp: the timestamp for end of execution
156
  @ivar change: a Condition variable we use for waiting for job changes
157

158
  """
159
  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160
               "received_timestamp", "start_timestamp", "end_timestamp",
161
               "change",
162
               "__weakref__"]
163

    
164
  def __init__(self, queue, job_id, ops):
165
    """Constructor for the _QueuedJob.
166

167
    @type queue: L{JobQueue}
168
    @param queue: our parent queue
169
    @type job_id: job_id
170
    @param job_id: our job id
171
    @type ops: list
172
    @param ops: the list of opcodes we hold, which will be encapsulated
173
        in _QueuedOpCodes
174

175
    """
176
    if not ops:
177
      # TODO: use a better exception
178
      raise Exception("No opcodes")
179

    
180
    self.queue = queue
181
    self.id = job_id
182
    self.ops = [_QueuedOpCode(op) for op in ops]
183
    self.run_op_index = -1
184
    self.log_serial = 0
185
    self.received_timestamp = TimeStampNow()
186
    self.start_timestamp = None
187
    self.end_timestamp = None
188

    
189
    # Condition to wait for changes
190
    self.change = threading.Condition(self.queue._lock)
191

    
192
  @classmethod
193
  def Restore(cls, queue, state):
194
    """Restore a _QueuedJob from serialized state:
195

196
    @type queue: L{JobQueue}
197
    @param queue: to which queue the restored job belongs
198
    @type state: dict
199
    @param state: the serialized state
200
    @rtype: _JobQueue
201
    @return: the restored _JobQueue instance
202

203
    """
204
    obj = _QueuedJob.__new__(cls)
205
    obj.queue = queue
206
    obj.id = state["id"]
207
    obj.run_op_index = state["run_op_index"]
208
    obj.received_timestamp = state.get("received_timestamp", None)
209
    obj.start_timestamp = state.get("start_timestamp", None)
210
    obj.end_timestamp = state.get("end_timestamp", None)
211

    
212
    obj.ops = []
213
    obj.log_serial = 0
214
    for op_state in state["ops"]:
215
      op = _QueuedOpCode.Restore(op_state)
216
      for log_entry in op.log:
217
        obj.log_serial = max(obj.log_serial, log_entry[0])
218
      obj.ops.append(op)
219

    
220
    # Condition to wait for changes
221
    obj.change = threading.Condition(obj.queue._lock)
222

    
223
    return obj
224

    
225
  def Serialize(self):
226
    """Serialize the _JobQueue instance.
227

228
    @rtype: dict
229
    @return: the serialized state
230

231
    """
232
    return {
233
      "id": self.id,
234
      "ops": [op.Serialize() for op in self.ops],
235
      "run_op_index": self.run_op_index,
236
      "start_timestamp": self.start_timestamp,
237
      "end_timestamp": self.end_timestamp,
238
      "received_timestamp": self.received_timestamp,
239
      }
240

    
241
  def CalcStatus(self):
242
    """Compute the status of this job.
243

244
    This function iterates over all the _QueuedOpCodes in the job and
245
    based on their status, computes the job status.
246

247
    The algorithm is:
248
      - if we find a cancelled, or finished with error, the job
249
        status will be the same
250
      - otherwise, the last opcode with the status one of:
251
          - waitlock
252
          - canceling
253
          - running
254

255
        will determine the job status
256

257
      - otherwise, it means either all opcodes are queued, or success,
258
        and the job status will be the same
259

260
    @return: the job status
261

262
    """
263
    status = constants.JOB_STATUS_QUEUED
264

    
265
    all_success = True
266
    for op in self.ops:
267
      if op.status == constants.OP_STATUS_SUCCESS:
268
        continue
269

    
270
      all_success = False
271

    
272
      if op.status == constants.OP_STATUS_QUEUED:
273
        pass
274
      elif op.status == constants.OP_STATUS_WAITLOCK:
275
        status = constants.JOB_STATUS_WAITLOCK
276
      elif op.status == constants.OP_STATUS_RUNNING:
277
        status = constants.JOB_STATUS_RUNNING
278
      elif op.status == constants.OP_STATUS_CANCELING:
279
        status = constants.JOB_STATUS_CANCELING
280
        break
281
      elif op.status == constants.OP_STATUS_ERROR:
282
        status = constants.JOB_STATUS_ERROR
283
        # The whole job fails if one opcode failed
284
        break
285
      elif op.status == constants.OP_STATUS_CANCELED:
286
        status = constants.OP_STATUS_CANCELED
287
        break
288

    
289
    if all_success:
290
      status = constants.JOB_STATUS_SUCCESS
291

    
292
    return status
293

    
294
  def GetLogEntries(self, newer_than):
295
    """Selectively returns the log entries.
296

297
    @type newer_than: None or int
298
    @param newer_than: if this is None, return all log entries,
299
        otherwise return only the log entries with serial higher
300
        than this value
301
    @rtype: list
302
    @return: the list of the log entries selected
303

304
    """
305
    if newer_than is None:
306
      serial = -1
307
    else:
308
      serial = newer_than
309

    
310
    entries = []
311
    for op in self.ops:
312
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313

    
314
    return entries
315

    
316

    
317
class _JobQueueWorker(workerpool.BaseWorker):
318
  """The actual job workers.
319

320
  """
321
  def _NotifyStart(self):
322
    """Mark the opcode as running, not lock-waiting.
323

324
    This is called from the mcpu code as a notifier function, when the
325
    LU is finally about to start the Exec() method. Of course, to have
326
    end-user visible results, the opcode must be initially (before
327
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
328

329
    """
330
    assert self.queue, "Queue attribute is missing"
331
    assert self.opcode, "Opcode attribute is missing"
332

    
333
    self.queue.acquire()
334
    try:
335
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
336
                                    constants.OP_STATUS_CANCELING)
337

    
338
      # Cancel here if we were asked to
339
      if self.opcode.status == constants.OP_STATUS_CANCELING:
340
        raise CancelJob()
341

    
342
      self.opcode.status = constants.OP_STATUS_RUNNING
343
    finally:
344
      self.queue.release()
345

    
346
  def RunTask(self, job):
347
    """Job executor.
348

349
    This functions processes a job. It is closely tied to the _QueuedJob and
350
    _QueuedOpCode classes.
351

352
    @type job: L{_QueuedJob}
353
    @param job: the job to be processed
354

355
    """
356
    logging.info("Worker %s processing job %s",
357
                  self.worker_id, job.id)
358
    proc = mcpu.Processor(self.pool.queue.context)
359
    self.queue = queue = job.queue
360
    try:
361
      try:
362
        count = len(job.ops)
363
        for idx, op in enumerate(job.ops):
364
          op_summary = op.input.Summary()
365
          try:
366
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
367
                         op_summary)
368

    
369
            queue.acquire()
370
            try:
371
              if op.status == constants.OP_STATUS_CANCELED:
372
                raise CancelJob()
373
              assert op.status == constants.OP_STATUS_QUEUED
374
              job.run_op_index = idx
375
              op.status = constants.OP_STATUS_WAITLOCK
376
              op.result = None
377
              op.start_timestamp = TimeStampNow()
378
              if idx == 0: # first opcode
379
                job.start_timestamp = op.start_timestamp
380
              queue.UpdateJobUnlocked(job)
381

    
382
              input_opcode = op.input
383
            finally:
384
              queue.release()
385

    
386
            def _Log(*args):
387
              """Append a log entry.
388

389
              """
390
              assert len(args) < 3
391

    
392
              if len(args) == 1:
393
                log_type = constants.ELOG_MESSAGE
394
                log_msg = args[0]
395
              else:
396
                log_type, log_msg = args
397

    
398
              # The time is split to make serialization easier and not lose
399
              # precision.
400
              timestamp = utils.SplitTime(time.time())
401

    
402
              queue.acquire()
403
              try:
404
                job.log_serial += 1
405
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
406

    
407
                job.change.notifyAll()
408
              finally:
409
                queue.release()
410

    
411
            # Make sure not to hold lock while _Log is called
412
            self.opcode = op
413
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
414

    
415
            queue.acquire()
416
            try:
417
              op.status = constants.OP_STATUS_SUCCESS
418
              op.result = result
419
              op.end_timestamp = TimeStampNow()
420
              queue.UpdateJobUnlocked(job)
421
            finally:
422
              queue.release()
423

    
424
            logging.info("Op %s/%s: Successfully finished opcode %s",
425
                         idx + 1, count, op_summary)
426
          except CancelJob:
427
            # Will be handled further up
428
            raise
429
          except Exception, err:
430
            queue.acquire()
431
            try:
432
              try:
433
                op.status = constants.OP_STATUS_ERROR
434
                op.result = str(err)
435
                op.end_timestamp = TimeStampNow()
436
                logging.info("Op %s/%s: Error in opcode %s: %s",
437
                             idx + 1, count, op_summary, err)
438
              finally:
439
                queue.UpdateJobUnlocked(job)
440
            finally:
441
              queue.release()
442
            raise
443

    
444
      except CancelJob:
445
        queue.acquire()
446
        try:
447
          queue.CancelJobUnlocked(job)
448
        finally:
449
          queue.release()
450
      except errors.GenericError, err:
451
        logging.exception("Ganeti exception")
452
      except:
453
        logging.exception("Unhandled exception")
454
    finally:
455
      queue.acquire()
456
      try:
457
        try:
458
          job.run_op_idx = -1
459
          job.end_timestamp = TimeStampNow()
460
          queue.UpdateJobUnlocked(job)
461
        finally:
462
          job_id = job.id
463
          status = job.CalcStatus()
464
      finally:
465
        queue.release()
466
      logging.info("Worker %s finished job %s, status = %s",
467
                   self.worker_id, job_id, status)
468

    
469

    
470
class _JobQueueWorkerPool(workerpool.WorkerPool):
471
  """Simple class implementing a job-processing workerpool.
472

473
  """
474
  def __init__(self, queue):
475
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
476
                                              _JobQueueWorker)
477
    self.queue = queue
478

    
479

    
480
class JobQueue(object):
481
  """Queue used to manage the jobs.
482

483
  @cvar _RE_JOB_FILE: regex matching the valid job file names
484

485
  """
486
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
487

    
488
  def _RequireOpenQueue(fn):
489
    """Decorator for "public" functions.
490

491
    This function should be used for all 'public' functions. That is,
492
    functions usually called from other classes.
493

494
    @warning: Use this decorator only after utils.LockedMethod!
495

496
    Example::
497
      @utils.LockedMethod
498
      @_RequireOpenQueue
499
      def Example(self):
500
        pass
501

502
    """
503
    def wrapper(self, *args, **kwargs):
504
      assert self._queue_lock is not None, "Queue should be open"
505
      return fn(self, *args, **kwargs)
506
    return wrapper
507

    
508
  def __init__(self, context):
509
    """Constructor for JobQueue.
510

511
    The constructor will initialize the job queue object and then
512
    start loading the current jobs from disk, either for starting them
513
    (if they were queue) or for aborting them (if they were already
514
    running).
515

516
    @type context: GanetiContext
517
    @param context: the context object for access to the configuration
518
        data and other ganeti objects
519

520
    """
521
    self.context = context
522
    self._memcache = weakref.WeakValueDictionary()
523
    self._my_hostname = utils.HostInfo().name
524

    
525
    # Locking
526
    self._lock = threading.Lock()
527
    self.acquire = self._lock.acquire
528
    self.release = self._lock.release
529

    
530
    # Initialize
531
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
532

    
533
    # Read serial file
534
    self._last_serial = jstore.ReadSerial()
535
    assert self._last_serial is not None, ("Serial file was modified between"
536
                                           " check in jstore and here")
537

    
538
    # Get initial list of nodes
539
    self._nodes = dict((n.name, n.primary_ip)
540
                       for n in self.context.cfg.GetAllNodesInfo().values()
541
                       if n.master_candidate)
542

    
543
    # Remove master node
544
    try:
545
      del self._nodes[self._my_hostname]
546
    except KeyError:
547
      pass
548

    
549
    # TODO: Check consistency across nodes
550

    
551
    # Setup worker pool
552
    self._wpool = _JobQueueWorkerPool(self)
553
    try:
554
      # We need to lock here because WorkerPool.AddTask() may start a job while
555
      # we're still doing our work.
556
      self.acquire()
557
      try:
558
        logging.info("Inspecting job queue")
559

    
560
        all_job_ids = self._GetJobIDsUnlocked()
561
        jobs_count = len(all_job_ids)
562
        lastinfo = time.time()
563
        for idx, job_id in enumerate(all_job_ids):
564
          # Give an update every 1000 jobs or 10 seconds
565
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
566
              idx == (jobs_count - 1)):
567
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
568
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
569
            lastinfo = time.time()
570

    
571
          job = self._LoadJobUnlocked(job_id)
572

    
573
          # a failure in loading the job can cause 'None' to be returned
574
          if job is None:
575
            continue
576

    
577
          status = job.CalcStatus()
578

    
579
          if status in (constants.JOB_STATUS_QUEUED, ):
580
            self._wpool.AddTask(job)
581

    
582
          elif status in (constants.JOB_STATUS_RUNNING,
583
                          constants.JOB_STATUS_WAITLOCK,
584
                          constants.JOB_STATUS_CANCELING):
585
            logging.warning("Unfinished job %s found: %s", job.id, job)
586
            try:
587
              for op in job.ops:
588
                op.status = constants.OP_STATUS_ERROR
589
                op.result = "Unclean master daemon shutdown"
590
            finally:
591
              self.UpdateJobUnlocked(job)
592

    
593
        logging.info("Job queue inspection finished")
594
      finally:
595
        self.release()
596
    except:
597
      self._wpool.TerminateWorkers()
598
      raise
599

    
600
  @utils.LockedMethod
601
  @_RequireOpenQueue
602
  def AddNode(self, node):
603
    """Register a new node with the queue.
604

605
    @type node: L{objects.Node}
606
    @param node: the node object to be added
607

608
    """
609
    node_name = node.name
610
    assert node_name != self._my_hostname
611

    
612
    # Clean queue directory on added node
613
    rpc.RpcRunner.call_jobqueue_purge(node_name)
614

    
615
    if not node.master_candidate:
616
      # remove if existing, ignoring errors
617
      self._nodes.pop(node_name, None)
618
      # and skip the replication of the job ids
619
      return
620

    
621
    # Upload the whole queue excluding archived jobs
622
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
623

    
624
    # Upload current serial file
625
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
626

    
627
    for file_name in files:
628
      # Read file content
629
      fd = open(file_name, "r")
630
      try:
631
        content = fd.read()
632
      finally:
633
        fd.close()
634

    
635
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
636
                                                  [node.primary_ip],
637
                                                  file_name, content)
638
      if not result[node_name]:
639
        logging.error("Failed to upload %s to %s", file_name, node_name)
640

    
641
    self._nodes[node_name] = node.primary_ip
642

    
643
  @utils.LockedMethod
644
  @_RequireOpenQueue
645
  def RemoveNode(self, node_name):
646
    """Callback called when removing nodes from the cluster.
647

648
    @type node_name: str
649
    @param node_name: the name of the node to remove
650

651
    """
652
    try:
653
      # The queue is removed by the "leave node" RPC call.
654
      del self._nodes[node_name]
655
    except KeyError:
656
      pass
657

    
658
  def _CheckRpcResult(self, result, nodes, failmsg):
659
    """Verifies the status of an RPC call.
660

661
    Since we aim to keep consistency should this node (the current
662
    master) fail, we will log errors if our rpc fail, and especially
663
    log the case when more than half of the nodes fails.
664

665
    @param result: the data as returned from the rpc call
666
    @type nodes: list
667
    @param nodes: the list of nodes we made the call to
668
    @type failmsg: str
669
    @param failmsg: the identifier to be used for logging
670

671
    """
672
    failed = []
673
    success = []
674

    
675
    for node in nodes:
676
      if result[node]:
677
        success.append(node)
678
      else:
679
        failed.append(node)
680

    
681
    if failed:
682
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
683

    
684
    # +1 for the master node
685
    if (len(success) + 1) < len(failed):
686
      # TODO: Handle failing nodes
687
      logging.error("More than half of the nodes failed")
688

    
689
  def _GetNodeIp(self):
690
    """Helper for returning the node name/ip list.
691

692
    @rtype: (list, list)
693
    @return: a tuple of two lists, the first one with the node
694
        names and the second one with the node addresses
695

696
    """
697
    name_list = self._nodes.keys()
698
    addr_list = [self._nodes[name] for name in name_list]
699
    return name_list, addr_list
700

    
701
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
702
    """Writes a file locally and then replicates it to all nodes.
703

704
    This function will replace the contents of a file on the local
705
    node and then replicate it to all the other nodes we have.
706

707
    @type file_name: str
708
    @param file_name: the path of the file to be replicated
709
    @type data: str
710
    @param data: the new contents of the file
711

712
    """
713
    utils.WriteFile(file_name, data=data)
714

    
715
    names, addrs = self._GetNodeIp()
716
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
717
    self._CheckRpcResult(result, self._nodes,
718
                         "Updating %s" % file_name)
719

    
720
  def _RenameFilesUnlocked(self, rename):
721
    """Renames a file locally and then replicate the change.
722

723
    This function will rename a file in the local queue directory
724
    and then replicate this rename to all the other nodes we have.
725

726
    @type rename: list of (old, new)
727
    @param rename: List containing tuples mapping old to new names
728

729
    """
730
    # Rename them locally
731
    for old, new in rename:
732
      utils.RenameFile(old, new, mkdir=True)
733

    
734
    # ... and on all nodes
735
    names, addrs = self._GetNodeIp()
736
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
737
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
738

    
739
  def _FormatJobID(self, job_id):
740
    """Convert a job ID to string format.
741

742
    Currently this just does C{str(job_id)} after performing some
743
    checks, but if we want to change the job id format this will
744
    abstract this change.
745

746
    @type job_id: int or long
747
    @param job_id: the numeric job id
748
    @rtype: str
749
    @return: the formatted job id
750

751
    """
752
    if not isinstance(job_id, (int, long)):
753
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
754
    if job_id < 0:
755
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
756

    
757
    return str(job_id)
758

    
759
  @classmethod
760
  def _GetArchiveDirectory(cls, job_id):
761
    """Returns the archive directory for a job.
762

763
    @type job_id: str
764
    @param job_id: Job identifier
765
    @rtype: str
766
    @return: Directory name
767

768
    """
769
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
770

    
771
  def _NewSerialUnlocked(self):
772
    """Generates a new job identifier.
773

774
    Job identifiers are unique during the lifetime of a cluster.
775

776
    @rtype: str
777
    @return: a string representing the job identifier.
778

779
    """
780
    # New number
781
    serial = self._last_serial + 1
782

    
783
    # Write to file
784
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
785
                                        "%s\n" % serial)
786

    
787
    # Keep it only if we were able to write the file
788
    self._last_serial = serial
789

    
790
    return self._FormatJobID(serial)
791

    
792
  @staticmethod
793
  def _GetJobPath(job_id):
794
    """Returns the job file for a given job id.
795

796
    @type job_id: str
797
    @param job_id: the job identifier
798
    @rtype: str
799
    @return: the path to the job file
800

801
    """
802
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
803

    
804
  @classmethod
805
  def _GetArchivedJobPath(cls, job_id):
806
    """Returns the archived job file for a give job id.
807

808
    @type job_id: str
809
    @param job_id: the job identifier
810
    @rtype: str
811
    @return: the path to the archived job file
812

813
    """
814
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
815
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
816

    
817
  @classmethod
818
  def _ExtractJobID(cls, name):
819
    """Extract the job id from a filename.
820

821
    @type name: str
822
    @param name: the job filename
823
    @rtype: job id or None
824
    @return: the job id corresponding to the given filename,
825
        or None if the filename does not represent a valid
826
        job file
827

828
    """
829
    m = cls._RE_JOB_FILE.match(name)
830
    if m:
831
      return m.group(1)
832
    else:
833
      return None
834

    
835
  def _GetJobIDsUnlocked(self, archived=False):
836
    """Return all known job IDs.
837

838
    If the parameter archived is True, archived jobs IDs will be
839
    included. Currently this argument is unused.
840

841
    The method only looks at disk because it's a requirement that all
842
    jobs are present on disk (so in the _memcache we don't have any
843
    extra IDs).
844

845
    @rtype: list
846
    @return: the list of job IDs
847

848
    """
849
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
850
    jlist = utils.NiceSort(jlist)
851
    return jlist
852

    
853
  def _ListJobFiles(self):
854
    """Returns the list of current job files.
855

856
    @rtype: list
857
    @return: the list of job file names
858

859
    """
860
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
861
            if self._RE_JOB_FILE.match(name)]
862

    
863
  def _LoadJobUnlocked(self, job_id):
864
    """Loads a job from the disk or memory.
865

866
    Given a job id, this will return the cached job object if
867
    existing, or try to load the job from the disk. If loading from
868
    disk, it will also add the job to the cache.
869

870
    @param job_id: the job id
871
    @rtype: L{_QueuedJob} or None
872
    @return: either None or the job object
873

874
    """
875
    job = self._memcache.get(job_id, None)
876
    if job:
877
      logging.debug("Found job %s in memcache", job_id)
878
      return job
879

    
880
    filepath = self._GetJobPath(job_id)
881
    logging.debug("Loading job from %s", filepath)
882
    try:
883
      fd = open(filepath, "r")
884
    except IOError, err:
885
      if err.errno in (errno.ENOENT, ):
886
        return None
887
      raise
888
    try:
889
      data = serializer.LoadJson(fd.read())
890
    finally:
891
      fd.close()
892

    
893
    try:
894
      job = _QueuedJob.Restore(self, data)
895
    except Exception, err:
896
      new_path = self._GetArchivedJobPath(job_id)
897
      if filepath == new_path:
898
        # job already archived (future case)
899
        logging.exception("Can't parse job %s", job_id)
900
      else:
901
        # non-archived case
902
        logging.exception("Can't parse job %s, will archive.", job_id)
903
        self._RenameFilesUnlocked([(filepath, new_path)])
904
      return None
905

    
906
    self._memcache[job_id] = job
907
    logging.debug("Added job %s to the cache", job_id)
908
    return job
909

    
910
  def _GetJobsUnlocked(self, job_ids):
911
    """Return a list of jobs based on their IDs.
912

913
    @type job_ids: list
914
    @param job_ids: either an empty list (meaning all jobs),
915
        or a list of job IDs
916
    @rtype: list
917
    @return: the list of job objects
918

919
    """
920
    if not job_ids:
921
      job_ids = self._GetJobIDsUnlocked()
922

    
923
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
924

    
925
  @staticmethod
926
  def _IsQueueMarkedDrain():
927
    """Check if the queue is marked from drain.
928

929
    This currently uses the queue drain file, which makes it a
930
    per-node flag. In the future this can be moved to the config file.
931

932
    @rtype: boolean
933
    @return: True of the job queue is marked for draining
934

935
    """
936
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
937

    
938
  @staticmethod
939
  def SetDrainFlag(drain_flag):
940
    """Sets the drain flag for the queue.
941

942
    This is similar to the function L{backend.JobQueueSetDrainFlag},
943
    and in the future we might merge them.
944

945
    @type drain_flag: boolean
946
    @param drain_flag: Whether to set or unset the drain flag
947

948
    """
949
    if drain_flag:
950
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
951
    else:
952
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
953
    return True
954

    
955
  @utils.LockedMethod
956
  @_RequireOpenQueue
957
  def SubmitJob(self, ops):
958
    """Create and store a new job.
959

960
    This enters the job into our job queue and also puts it on the new
961
    queue, in order for it to be picked up by the queue processors.
962

963
    @type ops: list
964
    @param ops: The list of OpCodes that will become the new job.
965
    @rtype: job ID
966
    @return: the job ID of the newly created job
967
    @raise errors.JobQueueDrainError: if the job is marked for draining
968

969
    """
970
    if self._IsQueueMarkedDrain():
971
      raise errors.JobQueueDrainError()
972

    
973
    # Check job queue size
974
    size = len(self._ListJobFiles())
975
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
976
      # TODO: Autoarchive jobs. Make sure it's not done on every job
977
      # submission, though.
978
      #size = ...
979
      pass
980

    
981
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
982
      raise errors.JobQueueFull()
983

    
984
    # Get job identifier
985
    job_id = self._NewSerialUnlocked()
986
    job = _QueuedJob(self, job_id, ops)
987

    
988
    # Write to disk
989
    self.UpdateJobUnlocked(job)
990

    
991
    logging.debug("Adding new job %s to the cache", job_id)
992
    self._memcache[job_id] = job
993

    
994
    # Add to worker pool
995
    self._wpool.AddTask(job)
996

    
997
    return job.id
998

    
999
  @_RequireOpenQueue
1000
  def UpdateJobUnlocked(self, job):
1001
    """Update a job's on disk storage.
1002

1003
    After a job has been modified, this function needs to be called in
1004
    order to write the changes to disk and replicate them to the other
1005
    nodes.
1006

1007
    @type job: L{_QueuedJob}
1008
    @param job: the changed job
1009

1010
    """
1011
    filename = self._GetJobPath(job.id)
1012
    data = serializer.DumpJson(job.Serialize(), indent=False)
1013
    logging.debug("Writing job %s to %s", job.id, filename)
1014
    self._WriteAndReplicateFileUnlocked(filename, data)
1015

    
1016
    # Notify waiters about potential changes
1017
    job.change.notifyAll()
1018

    
1019
  @utils.LockedMethod
1020
  @_RequireOpenQueue
1021
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1022
                        timeout):
1023
    """Waits for changes in a job.
1024

1025
    @type job_id: string
1026
    @param job_id: Job identifier
1027
    @type fields: list of strings
1028
    @param fields: Which fields to check for changes
1029
    @type prev_job_info: list or None
1030
    @param prev_job_info: Last job information returned
1031
    @type prev_log_serial: int
1032
    @param prev_log_serial: Last job message serial number
1033
    @type timeout: float
1034
    @param timeout: maximum time to wait
1035
    @rtype: tuple (job info, log entries)
1036
    @return: a tuple of the job information as required via
1037
        the fields parameter, and the log entries as a list
1038

1039
        if the job has not changed and the timeout has expired,
1040
        we instead return a special value,
1041
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1042
        as such by the clients
1043

1044
    """
1045
    logging.debug("Waiting for changes in job %s", job_id)
1046
    end_time = time.time() + timeout
1047
    while True:
1048
      delta_time = end_time - time.time()
1049
      if delta_time < 0:
1050
        return constants.JOB_NOTCHANGED
1051

    
1052
      job = self._LoadJobUnlocked(job_id)
1053
      if not job:
1054
        logging.debug("Job %s not found", job_id)
1055
        break
1056

    
1057
      status = job.CalcStatus()
1058
      job_info = self._GetJobInfoUnlocked(job, fields)
1059
      log_entries = job.GetLogEntries(prev_log_serial)
1060

    
1061
      # Serializing and deserializing data can cause type changes (e.g. from
1062
      # tuple to list) or precision loss. We're doing it here so that we get
1063
      # the same modifications as the data received from the client. Without
1064
      # this, the comparison afterwards might fail without the data being
1065
      # significantly different.
1066
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1067
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1068

    
1069
      if status not in (constants.JOB_STATUS_QUEUED,
1070
                        constants.JOB_STATUS_RUNNING,
1071
                        constants.JOB_STATUS_WAITLOCK):
1072
        # Don't even try to wait if the job is no longer running, there will be
1073
        # no changes.
1074
        break
1075

    
1076
      if (prev_job_info != job_info or
1077
          (log_entries and prev_log_serial != log_entries[0][0])):
1078
        break
1079

    
1080
      logging.debug("Waiting again")
1081

    
1082
      # Release the queue lock while waiting
1083
      job.change.wait(delta_time)
1084

    
1085
    logging.debug("Job %s changed", job_id)
1086

    
1087
    return (job_info, log_entries)
1088

    
1089
  @utils.LockedMethod
1090
  @_RequireOpenQueue
1091
  def CancelJob(self, job_id):
1092
    """Cancels a job.
1093

1094
    This will only succeed if the job has not started yet.
1095

1096
    @type job_id: string
1097
    @param job_id: job ID of job to be cancelled.
1098

1099
    """
1100
    logging.info("Cancelling job %s", job_id)
1101

    
1102
    job = self._LoadJobUnlocked(job_id)
1103
    if not job:
1104
      logging.debug("Job %s not found", job_id)
1105
      return (False, "Job %s not found" % job_id)
1106

    
1107
    job_status = job.CalcStatus()
1108

    
1109
    if job_status not in (constants.JOB_STATUS_QUEUED,
1110
                          constants.JOB_STATUS_WAITLOCK):
1111
      logging.debug("Job %s is no longer in the queue", job.id)
1112
      return (False, "Job %s is no longer in the queue" % job.id)
1113

    
1114
    if job_status == constants.JOB_STATUS_QUEUED:
1115
      self.CancelJobUnlocked(job)
1116
      return (True, "Job %s canceled" % job.id)
1117

    
1118
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1119
      # The worker will notice the new status and cancel the job
1120
      try:
1121
        for op in job.ops:
1122
          op.status = constants.OP_STATUS_CANCELING
1123
      finally:
1124
        self.UpdateJobUnlocked(job)
1125
      return (True, "Job %s will be canceled" % job.id)
1126

    
1127
  @_RequireOpenQueue
1128
  def CancelJobUnlocked(self, job):
1129
    """Marks a job as canceled.
1130

1131
    """
1132
    try:
1133
      for op in job.ops:
1134
        op.status = constants.OP_STATUS_CANCELED
1135
        op.result = "Job canceled by request"
1136
    finally:
1137
      self.UpdateJobUnlocked(job)
1138

    
1139
  @_RequireOpenQueue
1140
  def _ArchiveJobsUnlocked(self, jobs):
1141
    """Archives jobs.
1142

1143
    @type jobs: list of L{_QueuedJob}
1144
    @param jobs: Job objects
1145
    @rtype: int
1146
    @return: Number of archived jobs
1147

1148
    """
1149
    archive_jobs = []
1150
    rename_files = []
1151
    for job in jobs:
1152
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1153
                                  constants.JOB_STATUS_SUCCESS,
1154
                                  constants.JOB_STATUS_ERROR):
1155
        logging.debug("Job %s is not yet done", job.id)
1156
        continue
1157

    
1158
      archive_jobs.append(job)
1159

    
1160
      old = self._GetJobPath(job.id)
1161
      new = self._GetArchivedJobPath(job.id)
1162
      rename_files.append((old, new))
1163

    
1164
    # TODO: What if 1..n files fail to rename?
1165
    self._RenameFilesUnlocked(rename_files)
1166

    
1167
    logging.debug("Successfully archived job(s) %s",
1168
                  ", ".join(job.id for job in archive_jobs))
1169

    
1170
    return len(archive_jobs)
1171

    
1172
  @utils.LockedMethod
1173
  @_RequireOpenQueue
1174
  def ArchiveJob(self, job_id):
1175
    """Archives a job.
1176

1177
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1178

1179
    @type job_id: string
1180
    @param job_id: Job ID of job to be archived.
1181
    @rtype: bool
1182
    @return: Whether job was archived
1183

1184
    """
1185
    logging.info("Archiving job %s", job_id)
1186

    
1187
    job = self._LoadJobUnlocked(job_id)
1188
    if not job:
1189
      logging.debug("Job %s not found", job_id)
1190
      return False
1191

    
1192
    return self._ArchiveJobsUnlocked([job]) == 1
1193

    
1194
  @utils.LockedMethod
1195
  @_RequireOpenQueue
1196
  def AutoArchiveJobs(self, age, timeout):
1197
    """Archives all jobs based on age.
1198

1199
    The method will archive all jobs which are older than the age
1200
    parameter. For jobs that don't have an end timestamp, the start
1201
    timestamp will be considered. The special '-1' age will cause
1202
    archival of all jobs (that are not running or queued).
1203

1204
    @type age: int
1205
    @param age: the minimum age in seconds
1206

1207
    """
1208
    logging.info("Archiving jobs with age more than %s seconds", age)
1209

    
1210
    now = time.time()
1211
    end_time = now + timeout
1212
    archived_count = 0
1213
    last_touched = 0
1214

    
1215
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1216
    pending = []
1217
    for idx, job_id in enumerate(all_job_ids):
1218
      last_touched = idx
1219

    
1220
      # Not optimal because jobs could be pending
1221
      # TODO: Measure average duration for job archival and take number of
1222
      # pending jobs into account.
1223
      if time.time() > end_time:
1224
        break
1225

    
1226
      # Returns None if the job failed to load
1227
      job = self._LoadJobUnlocked(job_id)
1228
      if job:
1229
        if job.end_timestamp is None:
1230
          if job.start_timestamp is None:
1231
            job_age = job.received_timestamp
1232
          else:
1233
            job_age = job.start_timestamp
1234
        else:
1235
          job_age = job.end_timestamp
1236

    
1237
        if age == -1 or now - job_age[0] > age:
1238
          pending.append(job)
1239

    
1240
          # Archive 10 jobs at a time
1241
          if len(pending) >= 10:
1242
            archived_count += self._ArchiveJobsUnlocked(pending)
1243
            pending = []
1244

    
1245
    if pending:
1246
      archived_count += self._ArchiveJobsUnlocked(pending)
1247

    
1248
    return (archived_count, len(all_job_ids) - last_touched - 1)
1249

    
1250
  def _GetJobInfoUnlocked(self, job, fields):
1251
    """Returns information about a job.
1252

1253
    @type job: L{_QueuedJob}
1254
    @param job: the job which we query
1255
    @type fields: list
1256
    @param fields: names of fields to return
1257
    @rtype: list
1258
    @return: list with one element for each field
1259
    @raise errors.OpExecError: when an invalid field
1260
        has been passed
1261

1262
    """
1263
    row = []
1264
    for fname in fields:
1265
      if fname == "id":
1266
        row.append(job.id)
1267
      elif fname == "status":
1268
        row.append(job.CalcStatus())
1269
      elif fname == "ops":
1270
        row.append([op.input.__getstate__() for op in job.ops])
1271
      elif fname == "opresult":
1272
        row.append([op.result for op in job.ops])
1273
      elif fname == "opstatus":
1274
        row.append([op.status for op in job.ops])
1275
      elif fname == "oplog":
1276
        row.append([op.log for op in job.ops])
1277
      elif fname == "opstart":
1278
        row.append([op.start_timestamp for op in job.ops])
1279
      elif fname == "opend":
1280
        row.append([op.end_timestamp for op in job.ops])
1281
      elif fname == "received_ts":
1282
        row.append(job.received_timestamp)
1283
      elif fname == "start_ts":
1284
        row.append(job.start_timestamp)
1285
      elif fname == "end_ts":
1286
        row.append(job.end_timestamp)
1287
      elif fname == "summary":
1288
        row.append([op.input.Summary() for op in job.ops])
1289
      else:
1290
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1291
    return row
1292

    
1293
  @utils.LockedMethod
1294
  @_RequireOpenQueue
1295
  def QueryJobs(self, job_ids, fields):
1296
    """Returns a list of jobs in queue.
1297

1298
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1299
    processing for each job.
1300

1301
    @type job_ids: list
1302
    @param job_ids: sequence of job identifiers or None for all
1303
    @type fields: list
1304
    @param fields: names of fields to return
1305
    @rtype: list
1306
    @return: list one element per job, each element being list with
1307
        the requested fields
1308

1309
    """
1310
    jobs = []
1311

    
1312
    for job in self._GetJobsUnlocked(job_ids):
1313
      if job is None:
1314
        jobs.append(None)
1315
      else:
1316
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1317

    
1318
    return jobs
1319

    
1320
  @utils.LockedMethod
1321
  @_RequireOpenQueue
1322
  def Shutdown(self):
1323
    """Stops the job queue.
1324

1325
    This shutdowns all the worker threads an closes the queue.
1326

1327
    """
1328
    self._wpool.TerminateWorkers()
1329

    
1330
    self._queue_lock.Close()
1331
    self._queue_lock = None