Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 031a3e57

History | View | Annotate | Download (41 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  __slots__ = ["input", "status", "result", "log",
84
               "start_timestamp", "end_timestamp",
85
               "__weakref__"]
86

    
87
  def __init__(self, op):
88
    """Constructor for the _QuededOpCode.
89

90
    @type op: L{opcodes.OpCode}
91
    @param op: the opcode we encapsulate
92

93
    """
94
    self.input = op
95
    self.status = constants.OP_STATUS_QUEUED
96
    self.result = None
97
    self.log = []
98
    self.start_timestamp = None
99
    self.end_timestamp = None
100

    
101
  @classmethod
102
  def Restore(cls, state):
103
    """Restore the _QueuedOpCode from the serialized form.
104

105
    @type state: dict
106
    @param state: the serialized state
107
    @rtype: _QueuedOpCode
108
    @return: a new _QueuedOpCode instance
109

110
    """
111
    obj = _QueuedOpCode.__new__(cls)
112
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113
    obj.status = state["status"]
114
    obj.result = state["result"]
115
    obj.log = state["log"]
116
    obj.start_timestamp = state.get("start_timestamp", None)
117
    obj.end_timestamp = state.get("end_timestamp", None)
118
    return obj
119

    
120
  def Serialize(self):
121
    """Serializes this _QueuedOpCode.
122

123
    @rtype: dict
124
    @return: the dictionary holding the serialized state
125

126
    """
127
    return {
128
      "input": self.input.__getstate__(),
129
      "status": self.status,
130
      "result": self.result,
131
      "log": self.log,
132
      "start_timestamp": self.start_timestamp,
133
      "end_timestamp": self.end_timestamp,
134
      }
135

    
136

    
137
class _QueuedJob(object):
138
  """In-memory job representation.
139

140
  This is what we use to track the user-submitted jobs. Locking must
141
  be taken care of by users of this class.
142

143
  @type queue: L{JobQueue}
144
  @ivar queue: the parent queue
145
  @ivar id: the job ID
146
  @type ops: list
147
  @ivar ops: the list of _QueuedOpCode that constitute the job
148
  @type run_op_index: int
149
  @ivar run_op_index: the currently executing opcode, or -1 if
150
      we didn't yet start executing
151
  @type log_serial: int
152
  @ivar log_serial: holds the index for the next log entry
153
  @ivar received_timestamp: the timestamp for when the job was received
154
  @ivar start_timestmap: the timestamp for start of execution
155
  @ivar end_timestamp: the timestamp for end of execution
156
  @ivar change: a Condition variable we use for waiting for job changes
157

158
  """
159
  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160
               "received_timestamp", "start_timestamp", "end_timestamp",
161
               "change",
162
               "__weakref__"]
163

    
164
  def __init__(self, queue, job_id, ops):
165
    """Constructor for the _QueuedJob.
166

167
    @type queue: L{JobQueue}
168
    @param queue: our parent queue
169
    @type job_id: job_id
170
    @param job_id: our job id
171
    @type ops: list
172
    @param ops: the list of opcodes we hold, which will be encapsulated
173
        in _QueuedOpCodes
174

175
    """
176
    if not ops:
177
      # TODO: use a better exception
178
      raise Exception("No opcodes")
179

    
180
    self.queue = queue
181
    self.id = job_id
182
    self.ops = [_QueuedOpCode(op) for op in ops]
183
    self.run_op_index = -1
184
    self.log_serial = 0
185
    self.received_timestamp = TimeStampNow()
186
    self.start_timestamp = None
187
    self.end_timestamp = None
188

    
189
    # Condition to wait for changes
190
    self.change = threading.Condition(self.queue._lock)
191

    
192
  @classmethod
193
  def Restore(cls, queue, state):
194
    """Restore a _QueuedJob from serialized state:
195

196
    @type queue: L{JobQueue}
197
    @param queue: to which queue the restored job belongs
198
    @type state: dict
199
    @param state: the serialized state
200
    @rtype: _JobQueue
201
    @return: the restored _JobQueue instance
202

203
    """
204
    obj = _QueuedJob.__new__(cls)
205
    obj.queue = queue
206
    obj.id = state["id"]
207
    obj.run_op_index = state["run_op_index"]
208
    obj.received_timestamp = state.get("received_timestamp", None)
209
    obj.start_timestamp = state.get("start_timestamp", None)
210
    obj.end_timestamp = state.get("end_timestamp", None)
211

    
212
    obj.ops = []
213
    obj.log_serial = 0
214
    for op_state in state["ops"]:
215
      op = _QueuedOpCode.Restore(op_state)
216
      for log_entry in op.log:
217
        obj.log_serial = max(obj.log_serial, log_entry[0])
218
      obj.ops.append(op)
219

    
220
    # Condition to wait for changes
221
    obj.change = threading.Condition(obj.queue._lock)
222

    
223
    return obj
224

    
225
  def Serialize(self):
226
    """Serialize the _JobQueue instance.
227

228
    @rtype: dict
229
    @return: the serialized state
230

231
    """
232
    return {
233
      "id": self.id,
234
      "ops": [op.Serialize() for op in self.ops],
235
      "run_op_index": self.run_op_index,
236
      "start_timestamp": self.start_timestamp,
237
      "end_timestamp": self.end_timestamp,
238
      "received_timestamp": self.received_timestamp,
239
      }
240

    
241
  def CalcStatus(self):
242
    """Compute the status of this job.
243

244
    This function iterates over all the _QueuedOpCodes in the job and
245
    based on their status, computes the job status.
246

247
    The algorithm is:
248
      - if we find a cancelled, or finished with error, the job
249
        status will be the same
250
      - otherwise, the last opcode with the status one of:
251
          - waitlock
252
          - canceling
253
          - running
254

255
        will determine the job status
256

257
      - otherwise, it means either all opcodes are queued, or success,
258
        and the job status will be the same
259

260
    @return: the job status
261

262
    """
263
    status = constants.JOB_STATUS_QUEUED
264

    
265
    all_success = True
266
    for op in self.ops:
267
      if op.status == constants.OP_STATUS_SUCCESS:
268
        continue
269

    
270
      all_success = False
271

    
272
      if op.status == constants.OP_STATUS_QUEUED:
273
        pass
274
      elif op.status == constants.OP_STATUS_WAITLOCK:
275
        status = constants.JOB_STATUS_WAITLOCK
276
      elif op.status == constants.OP_STATUS_RUNNING:
277
        status = constants.JOB_STATUS_RUNNING
278
      elif op.status == constants.OP_STATUS_CANCELING:
279
        status = constants.JOB_STATUS_CANCELING
280
        break
281
      elif op.status == constants.OP_STATUS_ERROR:
282
        status = constants.JOB_STATUS_ERROR
283
        # The whole job fails if one opcode failed
284
        break
285
      elif op.status == constants.OP_STATUS_CANCELED:
286
        status = constants.OP_STATUS_CANCELED
287
        break
288

    
289
    if all_success:
290
      status = constants.JOB_STATUS_SUCCESS
291

    
292
    return status
293

    
294
  def GetLogEntries(self, newer_than):
295
    """Selectively returns the log entries.
296

297
    @type newer_than: None or int
298
    @param newer_than: if this is None, return all log entries,
299
        otherwise return only the log entries with serial higher
300
        than this value
301
    @rtype: list
302
    @return: the list of the log entries selected
303

304
    """
305
    if newer_than is None:
306
      serial = -1
307
    else:
308
      serial = newer_than
309

    
310
    entries = []
311
    for op in self.ops:
312
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313

    
314
    return entries
315

    
316
  def MarkUnfinishedOps(self, status, result):
317
    """Mark unfinished opcodes with a given status and result.
318

319
    This is an utility function for marking all running or waiting to
320
    be run opcodes with a given status. Opcodes which are already
321
    finalised are not changed.
322

323
    @param status: a given opcode status
324
    @param result: the opcode result
325

326
    """
327
    not_marked = True
328
    for op in self.ops:
329
      if op.status in constants.OPS_FINALIZED:
330
        assert not_marked, "Finalized opcodes found after non-finalized ones"
331
        continue
332
      op.status = status
333
      op.result = result
334
      not_marked = False
335

    
336

    
337
class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
338
  def __init__(self, queue, job, op):
339
    """Initializes this class.
340

341
    @type queue: L{JobQueue}
342
    @param queue: Job queue
343
    @type job: L{_QueuedJob}
344
    @param job: Job object
345
    @type op: L{_QueuedOpCode}
346
    @param op: OpCode
347

348
    """
349
    assert queue, "Queue is missing"
350
    assert job, "Job is missing"
351
    assert op, "Opcode is missing"
352

    
353
    self._queue = queue
354
    self._job = job
355
    self._op = op
356

    
357
  def NotifyStart(self):
358
    """Mark the opcode as running, not lock-waiting.
359

360
    This is called from the mcpu code as a notifier function, when the LU is
361
    finally about to start the Exec() method. Of course, to have end-user
362
    visible results, the opcode must be initially (before calling into
363
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
364

365
    """
366
    self._queue.acquire()
367
    try:
368
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
369
                                 constants.OP_STATUS_CANCELING)
370

    
371
      # Cancel here if we were asked to
372
      if self._op.status == constants.OP_STATUS_CANCELING:
373
        raise CancelJob()
374

    
375
      self._op.status = constants.OP_STATUS_RUNNING
376
    finally:
377
      self._queue.release()
378

    
379
  def Feedback(self, *args):
380
    """Append a log entry.
381

382
    """
383
    assert len(args) < 3
384

    
385
    if len(args) == 1:
386
      log_type = constants.ELOG_MESSAGE
387
      log_msg = args[0]
388
    else:
389
      (log_type, log_msg) = args
390

    
391
    # The time is split to make serialization easier and not lose
392
    # precision.
393
    timestamp = utils.SplitTime(time.time())
394

    
395
    self._queue.acquire()
396
    try:
397
      self._job.log_serial += 1
398
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
399

    
400
      self._job.change.notifyAll()
401
    finally:
402
      self._queue.release()
403

    
404

    
405
class _JobQueueWorker(workerpool.BaseWorker):
406
  """The actual job workers.
407

408
  """
409
  def RunTask(self, job):
410
    """Job executor.
411

412
    This functions processes a job. It is closely tied to the _QueuedJob and
413
    _QueuedOpCode classes.
414

415
    @type job: L{_QueuedJob}
416
    @param job: the job to be processed
417

418
    """
419
    logging.info("Worker %s processing job %s",
420
                  self.worker_id, job.id)
421
    proc = mcpu.Processor(self.pool.queue.context)
422
    queue = job.queue
423
    try:
424
      try:
425
        count = len(job.ops)
426
        for idx, op in enumerate(job.ops):
427
          op_summary = op.input.Summary()
428
          if op.status == constants.OP_STATUS_SUCCESS:
429
            # this is a job that was partially completed before master
430
            # daemon shutdown, so it can be expected that some opcodes
431
            # are already completed successfully (if any did error
432
            # out, then the whole job should have been aborted and not
433
            # resubmitted for processing)
434
            logging.info("Op %s/%s: opcode %s already processed, skipping",
435
                         idx + 1, count, op_summary)
436
            continue
437
          try:
438
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
439
                         op_summary)
440

    
441
            queue.acquire()
442
            try:
443
              if op.status == constants.OP_STATUS_CANCELED:
444
                raise CancelJob()
445
              assert op.status == constants.OP_STATUS_QUEUED
446
              job.run_op_index = idx
447
              op.status = constants.OP_STATUS_WAITLOCK
448
              op.result = None
449
              op.start_timestamp = TimeStampNow()
450
              if idx == 0: # first opcode
451
                job.start_timestamp = op.start_timestamp
452
              queue.UpdateJobUnlocked(job)
453

    
454
              input_opcode = op.input
455
            finally:
456
              queue.release()
457

    
458
            # Make sure not to hold queue lock while calling ExecOpCode
459
            result = proc.ExecOpCode(input_opcode,
460
                                     _OpCodeExecCallbacks(queue, job, op))
461

    
462
            queue.acquire()
463
            try:
464
              op.status = constants.OP_STATUS_SUCCESS
465
              op.result = result
466
              op.end_timestamp = TimeStampNow()
467
              queue.UpdateJobUnlocked(job)
468
            finally:
469
              queue.release()
470

    
471
            logging.info("Op %s/%s: Successfully finished opcode %s",
472
                         idx + 1, count, op_summary)
473
          except CancelJob:
474
            # Will be handled further up
475
            raise
476
          except Exception, err:
477
            queue.acquire()
478
            try:
479
              try:
480
                op.status = constants.OP_STATUS_ERROR
481
                if isinstance(err, errors.GenericError):
482
                  op.result = errors.EncodeException(err)
483
                else:
484
                  op.result = str(err)
485
                op.end_timestamp = TimeStampNow()
486
                logging.info("Op %s/%s: Error in opcode %s: %s",
487
                             idx + 1, count, op_summary, err)
488
              finally:
489
                queue.UpdateJobUnlocked(job)
490
            finally:
491
              queue.release()
492
            raise
493

    
494
      except CancelJob:
495
        queue.acquire()
496
        try:
497
          queue.CancelJobUnlocked(job)
498
        finally:
499
          queue.release()
500
      except errors.GenericError, err:
501
        logging.exception("Ganeti exception")
502
      except:
503
        logging.exception("Unhandled exception")
504
    finally:
505
      queue.acquire()
506
      try:
507
        try:
508
          job.run_op_index = -1
509
          job.end_timestamp = TimeStampNow()
510
          queue.UpdateJobUnlocked(job)
511
        finally:
512
          job_id = job.id
513
          status = job.CalcStatus()
514
      finally:
515
        queue.release()
516
      logging.info("Worker %s finished job %s, status = %s",
517
                   self.worker_id, job_id, status)
518

    
519

    
520
class _JobQueueWorkerPool(workerpool.WorkerPool):
521
  """Simple class implementing a job-processing workerpool.
522

523
  """
524
  def __init__(self, queue):
525
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
526
                                              _JobQueueWorker)
527
    self.queue = queue
528

    
529

    
530
class JobQueue(object):
531
  """Queue used to manage the jobs.
532

533
  @cvar _RE_JOB_FILE: regex matching the valid job file names
534

535
  """
536
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
537

    
538
  def _RequireOpenQueue(fn):
539
    """Decorator for "public" functions.
540

541
    This function should be used for all 'public' functions. That is,
542
    functions usually called from other classes.
543

544
    @warning: Use this decorator only after utils.LockedMethod!
545

546
    Example::
547
      @utils.LockedMethod
548
      @_RequireOpenQueue
549
      def Example(self):
550
        pass
551

552
    """
553
    def wrapper(self, *args, **kwargs):
554
      assert self._queue_lock is not None, "Queue should be open"
555
      return fn(self, *args, **kwargs)
556
    return wrapper
557

    
558
  def __init__(self, context):
559
    """Constructor for JobQueue.
560

561
    The constructor will initialize the job queue object and then
562
    start loading the current jobs from disk, either for starting them
563
    (if they were queue) or for aborting them (if they were already
564
    running).
565

566
    @type context: GanetiContext
567
    @param context: the context object for access to the configuration
568
        data and other ganeti objects
569

570
    """
571
    self.context = context
572
    self._memcache = weakref.WeakValueDictionary()
573
    self._my_hostname = utils.HostInfo().name
574

    
575
    # Locking
576
    self._lock = threading.Lock()
577
    self.acquire = self._lock.acquire
578
    self.release = self._lock.release
579

    
580
    # Initialize
581
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
582

    
583
    # Read serial file
584
    self._last_serial = jstore.ReadSerial()
585
    assert self._last_serial is not None, ("Serial file was modified between"
586
                                           " check in jstore and here")
587

    
588
    # Get initial list of nodes
589
    self._nodes = dict((n.name, n.primary_ip)
590
                       for n in self.context.cfg.GetAllNodesInfo().values()
591
                       if n.master_candidate)
592

    
593
    # Remove master node
594
    try:
595
      del self._nodes[self._my_hostname]
596
    except KeyError:
597
      pass
598

    
599
    # TODO: Check consistency across nodes
600

    
601
    # Setup worker pool
602
    self._wpool = _JobQueueWorkerPool(self)
603
    try:
604
      # We need to lock here because WorkerPool.AddTask() may start a job while
605
      # we're still doing our work.
606
      self.acquire()
607
      try:
608
        logging.info("Inspecting job queue")
609

    
610
        all_job_ids = self._GetJobIDsUnlocked()
611
        jobs_count = len(all_job_ids)
612
        lastinfo = time.time()
613
        for idx, job_id in enumerate(all_job_ids):
614
          # Give an update every 1000 jobs or 10 seconds
615
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
616
              idx == (jobs_count - 1)):
617
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
618
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
619
            lastinfo = time.time()
620

    
621
          job = self._LoadJobUnlocked(job_id)
622

    
623
          # a failure in loading the job can cause 'None' to be returned
624
          if job is None:
625
            continue
626

    
627
          status = job.CalcStatus()
628

    
629
          if status in (constants.JOB_STATUS_QUEUED, ):
630
            self._wpool.AddTask(job)
631

    
632
          elif status in (constants.JOB_STATUS_RUNNING,
633
                          constants.JOB_STATUS_WAITLOCK,
634
                          constants.JOB_STATUS_CANCELING):
635
            logging.warning("Unfinished job %s found: %s", job.id, job)
636
            try:
637
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
638
                                    "Unclean master daemon shutdown")
639
            finally:
640
              self.UpdateJobUnlocked(job)
641

    
642
        logging.info("Job queue inspection finished")
643
      finally:
644
        self.release()
645
    except:
646
      self._wpool.TerminateWorkers()
647
      raise
648

    
649
  @utils.LockedMethod
650
  @_RequireOpenQueue
651
  def AddNode(self, node):
652
    """Register a new node with the queue.
653

654
    @type node: L{objects.Node}
655
    @param node: the node object to be added
656

657
    """
658
    node_name = node.name
659
    assert node_name != self._my_hostname
660

    
661
    # Clean queue directory on added node
662
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
663
    msg = result.RemoteFailMsg()
664
    if msg:
665
      logging.warning("Cannot cleanup queue directory on node %s: %s",
666
                      node_name, msg)
667

    
668
    if not node.master_candidate:
669
      # remove if existing, ignoring errors
670
      self._nodes.pop(node_name, None)
671
      # and skip the replication of the job ids
672
      return
673

    
674
    # Upload the whole queue excluding archived jobs
675
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
676

    
677
    # Upload current serial file
678
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
679

    
680
    for file_name in files:
681
      # Read file content
682
      content = utils.ReadFile(file_name)
683

    
684
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
685
                                                  [node.primary_ip],
686
                                                  file_name, content)
687
      msg = result[node_name].RemoteFailMsg()
688
      if msg:
689
        logging.error("Failed to upload file %s to node %s: %s",
690
                      file_name, node_name, msg)
691

    
692
    self._nodes[node_name] = node.primary_ip
693

    
694
  @utils.LockedMethod
695
  @_RequireOpenQueue
696
  def RemoveNode(self, node_name):
697
    """Callback called when removing nodes from the cluster.
698

699
    @type node_name: str
700
    @param node_name: the name of the node to remove
701

702
    """
703
    try:
704
      # The queue is removed by the "leave node" RPC call.
705
      del self._nodes[node_name]
706
    except KeyError:
707
      pass
708

    
709
  def _CheckRpcResult(self, result, nodes, failmsg):
710
    """Verifies the status of an RPC call.
711

712
    Since we aim to keep consistency should this node (the current
713
    master) fail, we will log errors if our rpc fail, and especially
714
    log the case when more than half of the nodes fails.
715

716
    @param result: the data as returned from the rpc call
717
    @type nodes: list
718
    @param nodes: the list of nodes we made the call to
719
    @type failmsg: str
720
    @param failmsg: the identifier to be used for logging
721

722
    """
723
    failed = []
724
    success = []
725

    
726
    for node in nodes:
727
      msg = result[node].RemoteFailMsg()
728
      if msg:
729
        failed.append(node)
730
        logging.error("RPC call %s failed on node %s: %s",
731
                      result[node].call, node, msg)
732
      else:
733
        success.append(node)
734

    
735
    # +1 for the master node
736
    if (len(success) + 1) < len(failed):
737
      # TODO: Handle failing nodes
738
      logging.error("More than half of the nodes failed")
739

    
740
  def _GetNodeIp(self):
741
    """Helper for returning the node name/ip list.
742

743
    @rtype: (list, list)
744
    @return: a tuple of two lists, the first one with the node
745
        names and the second one with the node addresses
746

747
    """
748
    name_list = self._nodes.keys()
749
    addr_list = [self._nodes[name] for name in name_list]
750
    return name_list, addr_list
751

    
752
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
753
    """Writes a file locally and then replicates it to all nodes.
754

755
    This function will replace the contents of a file on the local
756
    node and then replicate it to all the other nodes we have.
757

758
    @type file_name: str
759
    @param file_name: the path of the file to be replicated
760
    @type data: str
761
    @param data: the new contents of the file
762

763
    """
764
    utils.WriteFile(file_name, data=data)
765

    
766
    names, addrs = self._GetNodeIp()
767
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
768
    self._CheckRpcResult(result, self._nodes,
769
                         "Updating %s" % file_name)
770

    
771
  def _RenameFilesUnlocked(self, rename):
772
    """Renames a file locally and then replicate the change.
773

774
    This function will rename a file in the local queue directory
775
    and then replicate this rename to all the other nodes we have.
776

777
    @type rename: list of (old, new)
778
    @param rename: List containing tuples mapping old to new names
779

780
    """
781
    # Rename them locally
782
    for old, new in rename:
783
      utils.RenameFile(old, new, mkdir=True)
784

    
785
    # ... and on all nodes
786
    names, addrs = self._GetNodeIp()
787
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
788
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
789

    
790
  def _FormatJobID(self, job_id):
791
    """Convert a job ID to string format.
792

793
    Currently this just does C{str(job_id)} after performing some
794
    checks, but if we want to change the job id format this will
795
    abstract this change.
796

797
    @type job_id: int or long
798
    @param job_id: the numeric job id
799
    @rtype: str
800
    @return: the formatted job id
801

802
    """
803
    if not isinstance(job_id, (int, long)):
804
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
805
    if job_id < 0:
806
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
807

    
808
    return str(job_id)
809

    
810
  @classmethod
811
  def _GetArchiveDirectory(cls, job_id):
812
    """Returns the archive directory for a job.
813

814
    @type job_id: str
815
    @param job_id: Job identifier
816
    @rtype: str
817
    @return: Directory name
818

819
    """
820
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
821

    
822
  def _NewSerialsUnlocked(self, count):
823
    """Generates a new job identifier.
824

825
    Job identifiers are unique during the lifetime of a cluster.
826

827
    @type count: integer
828
    @param count: how many serials to return
829
    @rtype: str
830
    @return: a string representing the job identifier.
831

832
    """
833
    assert count > 0
834
    # New number
835
    serial = self._last_serial + count
836

    
837
    # Write to file
838
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
839
                                        "%s\n" % serial)
840

    
841
    result = [self._FormatJobID(v)
842
              for v in range(self._last_serial, serial + 1)]
843
    # Keep it only if we were able to write the file
844
    self._last_serial = serial
845

    
846
    return result
847

    
848
  @staticmethod
849
  def _GetJobPath(job_id):
850
    """Returns the job file for a given job id.
851

852
    @type job_id: str
853
    @param job_id: the job identifier
854
    @rtype: str
855
    @return: the path to the job file
856

857
    """
858
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
859

    
860
  @classmethod
861
  def _GetArchivedJobPath(cls, job_id):
862
    """Returns the archived job file for a give job id.
863

864
    @type job_id: str
865
    @param job_id: the job identifier
866
    @rtype: str
867
    @return: the path to the archived job file
868

869
    """
870
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
871
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
872

    
873
  @classmethod
874
  def _ExtractJobID(cls, name):
875
    """Extract the job id from a filename.
876

877
    @type name: str
878
    @param name: the job filename
879
    @rtype: job id or None
880
    @return: the job id corresponding to the given filename,
881
        or None if the filename does not represent a valid
882
        job file
883

884
    """
885
    m = cls._RE_JOB_FILE.match(name)
886
    if m:
887
      return m.group(1)
888
    else:
889
      return None
890

    
891
  def _GetJobIDsUnlocked(self, archived=False):
892
    """Return all known job IDs.
893

894
    If the parameter archived is True, archived jobs IDs will be
895
    included. Currently this argument is unused.
896

897
    The method only looks at disk because it's a requirement that all
898
    jobs are present on disk (so in the _memcache we don't have any
899
    extra IDs).
900

901
    @rtype: list
902
    @return: the list of job IDs
903

904
    """
905
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
906
    jlist = utils.NiceSort(jlist)
907
    return jlist
908

    
909
  def _ListJobFiles(self):
910
    """Returns the list of current job files.
911

912
    @rtype: list
913
    @return: the list of job file names
914

915
    """
916
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
917
            if self._RE_JOB_FILE.match(name)]
918

    
919
  def _LoadJobUnlocked(self, job_id):
920
    """Loads a job from the disk or memory.
921

922
    Given a job id, this will return the cached job object if
923
    existing, or try to load the job from the disk. If loading from
924
    disk, it will also add the job to the cache.
925

926
    @param job_id: the job id
927
    @rtype: L{_QueuedJob} or None
928
    @return: either None or the job object
929

930
    """
931
    job = self._memcache.get(job_id, None)
932
    if job:
933
      logging.debug("Found job %s in memcache", job_id)
934
      return job
935

    
936
    filepath = self._GetJobPath(job_id)
937
    logging.debug("Loading job from %s", filepath)
938
    try:
939
      raw_data = utils.ReadFile(filepath)
940
    except IOError, err:
941
      if err.errno in (errno.ENOENT, ):
942
        return None
943
      raise
944

    
945
    data = serializer.LoadJson(raw_data)
946

    
947
    try:
948
      job = _QueuedJob.Restore(self, data)
949
    except Exception, err:
950
      new_path = self._GetArchivedJobPath(job_id)
951
      if filepath == new_path:
952
        # job already archived (future case)
953
        logging.exception("Can't parse job %s", job_id)
954
      else:
955
        # non-archived case
956
        logging.exception("Can't parse job %s, will archive.", job_id)
957
        self._RenameFilesUnlocked([(filepath, new_path)])
958
      return None
959

    
960
    self._memcache[job_id] = job
961
    logging.debug("Added job %s to the cache", job_id)
962
    return job
963

    
964
  def _GetJobsUnlocked(self, job_ids):
965
    """Return a list of jobs based on their IDs.
966

967
    @type job_ids: list
968
    @param job_ids: either an empty list (meaning all jobs),
969
        or a list of job IDs
970
    @rtype: list
971
    @return: the list of job objects
972

973
    """
974
    if not job_ids:
975
      job_ids = self._GetJobIDsUnlocked()
976

    
977
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
978

    
979
  @staticmethod
980
  def _IsQueueMarkedDrain():
981
    """Check if the queue is marked from drain.
982

983
    This currently uses the queue drain file, which makes it a
984
    per-node flag. In the future this can be moved to the config file.
985

986
    @rtype: boolean
987
    @return: True of the job queue is marked for draining
988

989
    """
990
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
991

    
992
  @staticmethod
993
  def SetDrainFlag(drain_flag):
994
    """Sets the drain flag for the queue.
995

996
    This is similar to the function L{backend.JobQueueSetDrainFlag},
997
    and in the future we might merge them.
998

999
    @type drain_flag: boolean
1000
    @param drain_flag: Whether to set or unset the drain flag
1001

1002
    """
1003
    if drain_flag:
1004
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1005
    else:
1006
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1007
    return True
1008

    
1009
  @_RequireOpenQueue
1010
  def _SubmitJobUnlocked(self, job_id, ops):
1011
    """Create and store a new job.
1012

1013
    This enters the job into our job queue and also puts it on the new
1014
    queue, in order for it to be picked up by the queue processors.
1015

1016
    @type job_id: job ID
1017
    @param jod_id: the job ID for the new job
1018
    @type ops: list
1019
    @param ops: The list of OpCodes that will become the new job.
1020
    @rtype: job ID
1021
    @return: the job ID of the newly created job
1022
    @raise errors.JobQueueDrainError: if the job is marked for draining
1023

1024
    """
1025
    if self._IsQueueMarkedDrain():
1026
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1027

    
1028
    # Check job queue size
1029
    size = len(self._ListJobFiles())
1030
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1031
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1032
      # submission, though.
1033
      #size = ...
1034
      pass
1035

    
1036
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1037
      raise errors.JobQueueFull()
1038

    
1039
    job = _QueuedJob(self, job_id, ops)
1040

    
1041
    # Write to disk
1042
    self.UpdateJobUnlocked(job)
1043

    
1044
    logging.debug("Adding new job %s to the cache", job_id)
1045
    self._memcache[job_id] = job
1046

    
1047
    # Add to worker pool
1048
    self._wpool.AddTask(job)
1049

    
1050
    return job.id
1051

    
1052
  @utils.LockedMethod
1053
  @_RequireOpenQueue
1054
  def SubmitJob(self, ops):
1055
    """Create and store a new job.
1056

1057
    @see: L{_SubmitJobUnlocked}
1058

1059
    """
1060
    job_id = self._NewSerialsUnlocked(1)[0]
1061
    return self._SubmitJobUnlocked(job_id, ops)
1062

    
1063
  @utils.LockedMethod
1064
  @_RequireOpenQueue
1065
  def SubmitManyJobs(self, jobs):
1066
    """Create and store multiple jobs.
1067

1068
    @see: L{_SubmitJobUnlocked}
1069

1070
    """
1071
    results = []
1072
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1073
    for job_id, ops in zip(all_job_ids, jobs):
1074
      try:
1075
        data = self._SubmitJobUnlocked(job_id, ops)
1076
        status = True
1077
      except errors.GenericError, err:
1078
        data = str(err)
1079
        status = False
1080
      results.append((status, data))
1081

    
1082
    return results
1083

    
1084

    
1085
  @_RequireOpenQueue
1086
  def UpdateJobUnlocked(self, job):
1087
    """Update a job's on disk storage.
1088

1089
    After a job has been modified, this function needs to be called in
1090
    order to write the changes to disk and replicate them to the other
1091
    nodes.
1092

1093
    @type job: L{_QueuedJob}
1094
    @param job: the changed job
1095

1096
    """
1097
    filename = self._GetJobPath(job.id)
1098
    data = serializer.DumpJson(job.Serialize(), indent=False)
1099
    logging.debug("Writing job %s to %s", job.id, filename)
1100
    self._WriteAndReplicateFileUnlocked(filename, data)
1101

    
1102
    # Notify waiters about potential changes
1103
    job.change.notifyAll()
1104

    
1105
  @utils.LockedMethod
1106
  @_RequireOpenQueue
1107
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1108
                        timeout):
1109
    """Waits for changes in a job.
1110

1111
    @type job_id: string
1112
    @param job_id: Job identifier
1113
    @type fields: list of strings
1114
    @param fields: Which fields to check for changes
1115
    @type prev_job_info: list or None
1116
    @param prev_job_info: Last job information returned
1117
    @type prev_log_serial: int
1118
    @param prev_log_serial: Last job message serial number
1119
    @type timeout: float
1120
    @param timeout: maximum time to wait
1121
    @rtype: tuple (job info, log entries)
1122
    @return: a tuple of the job information as required via
1123
        the fields parameter, and the log entries as a list
1124

1125
        if the job has not changed and the timeout has expired,
1126
        we instead return a special value,
1127
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1128
        as such by the clients
1129

1130
    """
1131
    logging.debug("Waiting for changes in job %s", job_id)
1132

    
1133
    job_info = None
1134
    log_entries = None
1135

    
1136
    end_time = time.time() + timeout
1137
    while True:
1138
      delta_time = end_time - time.time()
1139
      if delta_time < 0:
1140
        return constants.JOB_NOTCHANGED
1141

    
1142
      job = self._LoadJobUnlocked(job_id)
1143
      if not job:
1144
        logging.debug("Job %s not found", job_id)
1145
        break
1146

    
1147
      status = job.CalcStatus()
1148
      job_info = self._GetJobInfoUnlocked(job, fields)
1149
      log_entries = job.GetLogEntries(prev_log_serial)
1150

    
1151
      # Serializing and deserializing data can cause type changes (e.g. from
1152
      # tuple to list) or precision loss. We're doing it here so that we get
1153
      # the same modifications as the data received from the client. Without
1154
      # this, the comparison afterwards might fail without the data being
1155
      # significantly different.
1156
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1157
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1158

    
1159
      if status not in (constants.JOB_STATUS_QUEUED,
1160
                        constants.JOB_STATUS_RUNNING,
1161
                        constants.JOB_STATUS_WAITLOCK):
1162
        # Don't even try to wait if the job is no longer running, there will be
1163
        # no changes.
1164
        break
1165

    
1166
      if (prev_job_info != job_info or
1167
          (log_entries and prev_log_serial != log_entries[0][0])):
1168
        break
1169

    
1170
      logging.debug("Waiting again")
1171

    
1172
      # Release the queue lock while waiting
1173
      job.change.wait(delta_time)
1174

    
1175
    logging.debug("Job %s changed", job_id)
1176

    
1177
    if job_info is None and log_entries is None:
1178
      return None
1179
    else:
1180
      return (job_info, log_entries)
1181

    
1182
  @utils.LockedMethod
1183
  @_RequireOpenQueue
1184
  def CancelJob(self, job_id):
1185
    """Cancels a job.
1186

1187
    This will only succeed if the job has not started yet.
1188

1189
    @type job_id: string
1190
    @param job_id: job ID of job to be cancelled.
1191

1192
    """
1193
    logging.info("Cancelling job %s", job_id)
1194

    
1195
    job = self._LoadJobUnlocked(job_id)
1196
    if not job:
1197
      logging.debug("Job %s not found", job_id)
1198
      return (False, "Job %s not found" % job_id)
1199

    
1200
    job_status = job.CalcStatus()
1201

    
1202
    if job_status not in (constants.JOB_STATUS_QUEUED,
1203
                          constants.JOB_STATUS_WAITLOCK):
1204
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1205
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1206

    
1207
    if job_status == constants.JOB_STATUS_QUEUED:
1208
      self.CancelJobUnlocked(job)
1209
      return (True, "Job %s canceled" % job.id)
1210

    
1211
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1212
      # The worker will notice the new status and cancel the job
1213
      try:
1214
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1215
      finally:
1216
        self.UpdateJobUnlocked(job)
1217
      return (True, "Job %s will be canceled" % job.id)
1218

    
1219
  @_RequireOpenQueue
1220
  def CancelJobUnlocked(self, job):
1221
    """Marks a job as canceled.
1222

1223
    """
1224
    try:
1225
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1226
                            "Job canceled by request")
1227
    finally:
1228
      self.UpdateJobUnlocked(job)
1229

    
1230
  @_RequireOpenQueue
1231
  def _ArchiveJobsUnlocked(self, jobs):
1232
    """Archives jobs.
1233

1234
    @type jobs: list of L{_QueuedJob}
1235
    @param jobs: Job objects
1236
    @rtype: int
1237
    @return: Number of archived jobs
1238

1239
    """
1240
    archive_jobs = []
1241
    rename_files = []
1242
    for job in jobs:
1243
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1244
                                  constants.JOB_STATUS_SUCCESS,
1245
                                  constants.JOB_STATUS_ERROR):
1246
        logging.debug("Job %s is not yet done", job.id)
1247
        continue
1248

    
1249
      archive_jobs.append(job)
1250

    
1251
      old = self._GetJobPath(job.id)
1252
      new = self._GetArchivedJobPath(job.id)
1253
      rename_files.append((old, new))
1254

    
1255
    # TODO: What if 1..n files fail to rename?
1256
    self._RenameFilesUnlocked(rename_files)
1257

    
1258
    logging.debug("Successfully archived job(s) %s",
1259
                  ", ".join(job.id for job in archive_jobs))
1260

    
1261
    return len(archive_jobs)
1262

    
1263
  @utils.LockedMethod
1264
  @_RequireOpenQueue
1265
  def ArchiveJob(self, job_id):
1266
    """Archives a job.
1267

1268
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1269

1270
    @type job_id: string
1271
    @param job_id: Job ID of job to be archived.
1272
    @rtype: bool
1273
    @return: Whether job was archived
1274

1275
    """
1276
    logging.info("Archiving job %s", job_id)
1277

    
1278
    job = self._LoadJobUnlocked(job_id)
1279
    if not job:
1280
      logging.debug("Job %s not found", job_id)
1281
      return False
1282

    
1283
    return self._ArchiveJobsUnlocked([job]) == 1
1284

    
1285
  @utils.LockedMethod
1286
  @_RequireOpenQueue
1287
  def AutoArchiveJobs(self, age, timeout):
1288
    """Archives all jobs based on age.
1289

1290
    The method will archive all jobs which are older than the age
1291
    parameter. For jobs that don't have an end timestamp, the start
1292
    timestamp will be considered. The special '-1' age will cause
1293
    archival of all jobs (that are not running or queued).
1294

1295
    @type age: int
1296
    @param age: the minimum age in seconds
1297

1298
    """
1299
    logging.info("Archiving jobs with age more than %s seconds", age)
1300

    
1301
    now = time.time()
1302
    end_time = now + timeout
1303
    archived_count = 0
1304
    last_touched = 0
1305

    
1306
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1307
    pending = []
1308
    for idx, job_id in enumerate(all_job_ids):
1309
      last_touched = idx
1310

    
1311
      # Not optimal because jobs could be pending
1312
      # TODO: Measure average duration for job archival and take number of
1313
      # pending jobs into account.
1314
      if time.time() > end_time:
1315
        break
1316

    
1317
      # Returns None if the job failed to load
1318
      job = self._LoadJobUnlocked(job_id)
1319
      if job:
1320
        if job.end_timestamp is None:
1321
          if job.start_timestamp is None:
1322
            job_age = job.received_timestamp
1323
          else:
1324
            job_age = job.start_timestamp
1325
        else:
1326
          job_age = job.end_timestamp
1327

    
1328
        if age == -1 or now - job_age[0] > age:
1329
          pending.append(job)
1330

    
1331
          # Archive 10 jobs at a time
1332
          if len(pending) >= 10:
1333
            archived_count += self._ArchiveJobsUnlocked(pending)
1334
            pending = []
1335

    
1336
    if pending:
1337
      archived_count += self._ArchiveJobsUnlocked(pending)
1338

    
1339
    return (archived_count, len(all_job_ids) - last_touched - 1)
1340

    
1341
  def _GetJobInfoUnlocked(self, job, fields):
1342
    """Returns information about a job.
1343

1344
    @type job: L{_QueuedJob}
1345
    @param job: the job which we query
1346
    @type fields: list
1347
    @param fields: names of fields to return
1348
    @rtype: list
1349
    @return: list with one element for each field
1350
    @raise errors.OpExecError: when an invalid field
1351
        has been passed
1352

1353
    """
1354
    row = []
1355
    for fname in fields:
1356
      if fname == "id":
1357
        row.append(job.id)
1358
      elif fname == "status":
1359
        row.append(job.CalcStatus())
1360
      elif fname == "ops":
1361
        row.append([op.input.__getstate__() for op in job.ops])
1362
      elif fname == "opresult":
1363
        row.append([op.result for op in job.ops])
1364
      elif fname == "opstatus":
1365
        row.append([op.status for op in job.ops])
1366
      elif fname == "oplog":
1367
        row.append([op.log for op in job.ops])
1368
      elif fname == "opstart":
1369
        row.append([op.start_timestamp for op in job.ops])
1370
      elif fname == "opend":
1371
        row.append([op.end_timestamp for op in job.ops])
1372
      elif fname == "received_ts":
1373
        row.append(job.received_timestamp)
1374
      elif fname == "start_ts":
1375
        row.append(job.start_timestamp)
1376
      elif fname == "end_ts":
1377
        row.append(job.end_timestamp)
1378
      elif fname == "summary":
1379
        row.append([op.input.Summary() for op in job.ops])
1380
      else:
1381
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1382
    return row
1383

    
1384
  @utils.LockedMethod
1385
  @_RequireOpenQueue
1386
  def QueryJobs(self, job_ids, fields):
1387
    """Returns a list of jobs in queue.
1388

1389
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1390
    processing for each job.
1391

1392
    @type job_ids: list
1393
    @param job_ids: sequence of job identifiers or None for all
1394
    @type fields: list
1395
    @param fields: names of fields to return
1396
    @rtype: list
1397
    @return: list one element per job, each element being list with
1398
        the requested fields
1399

1400
    """
1401
    jobs = []
1402

    
1403
    for job in self._GetJobsUnlocked(job_ids):
1404
      if job is None:
1405
        jobs.append(None)
1406
      else:
1407
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1408

    
1409
    return jobs
1410

    
1411
  @utils.LockedMethod
1412
  @_RequireOpenQueue
1413
  def Shutdown(self):
1414
    """Stops the job queue.
1415

1416
    This shutdowns all the worker threads an closes the queue.
1417

1418
    """
1419
    self._wpool.TerminateWorkers()
1420

    
1421
    self._queue_lock.Close()
1422
    self._queue_lock = None