Revision 6c5a7090

b/daemons/ganeti-masterd
217 217
      return queue.ArchiveJob(job_id)
218 218

  
219 219
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
220
      (job_id, fields, previous) = args
221
      return queue.WaitForJobChanges(job_id, fields, previous)
220
      (job_id, fields, prev_job_info, prev_log_serial) = args
221
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
222
                                     prev_log_serial)
222 223

  
223 224
    elif method == luxi.REQ_QUERY_JOBS:
224 225
      (job_ids, fields) = args
b/lib/cli.py
405 405
  if cl is None:
406 406
    cl = GetClient()
407 407

  
408
  state = None
409
  lastmsg = None
408
  prev_job_info = None
409
  prev_logmsg_serial = None
410

  
410 411
  while True:
411
    state = cl.WaitForJobChange(job_id, ["status", "ticker"], state)
412
    if not state:
412
    result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
413
                                 prev_logmsg_serial)
414
    if not result:
413 415
      # job not found, go away!
414 416
      raise errors.JobLost("Job with id %s lost" % job_id)
415 417

  
418
    # Split result, a tuple of (field values, log entries)
419
    (job_info, log_entries) = result
420
    (status, ) = job_info
421

  
422
    if log_entries:
423
      for log_entry in log_entries:
424
        (serial, timestamp, _, message) = log_entry
425
        if callable(feedback_fn):
426
          feedback_fn(log_entry[1:])
427
        else:
428
          print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), message)
429
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
430

  
416 431
    # TODO: Handle canceled and archived jobs
417
    status = state[0]
418
    if status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
432
    elif status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
419 433
      break
420
    msg = state[1]
421
    if msg is not None and msg != lastmsg:
422
      if callable(feedback_fn):
423
        feedback_fn(msg)
424
      else:
425
        print "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
426
    lastmsg = msg
434

  
435
    prev_job_info = job_info
427 436

  
428 437
  jobs = cl.QueryJobs([job_id], ["status", "opresult"])
429 438
  if not jobs:
b/lib/jqueue.py
19 19
# 02110-1301, USA.
20 20

  
21 21

  
22
"""Module implementing the job queue handling."""
22
"""Module implementing the job queue handling.
23

  
24
Locking:
25
There's a single, large lock in the JobQueue class. It's used by all other
26
classes in this module.
27

  
28
"""
23 29

  
24 30
import os
25 31
import logging
......
45 51
class _QueuedOpCode(object):
46 52
  """Encasulates an opcode object.
47 53

  
48
  Access is synchronized by the '_lock' attribute.
49

  
50 54
  The 'log' attribute holds the execution log and consists of tuples
51
  of the form (timestamp, level, message).
55
  of the form (log_serial, timestamp, level, message).
52 56

  
53 57
  """
54
  def __new__(cls, *args, **kwargs):
55
    obj = object.__new__(cls, *args, **kwargs)
56
    # Create a special lock for logging
57
    obj._log_lock = threading.Lock()
58
    return obj
59

  
60 58
  def __init__(self, op):
61 59
    self.input = op
62 60
    self.status = constants.OP_STATUS_QUEUED
......
73 71
    return obj
74 72

  
75 73
  def Serialize(self):
76
    self._log_lock.acquire()
77
    try:
78
      return {
79
        "input": self.input.__getstate__(),
80
        "status": self.status,
81
        "result": self.result,
82
        "log": self.log,
83
        }
84
    finally:
85
      self._log_lock.release()
86

  
87
  def Log(self, *args):
88
    """Append a log entry.
89

  
90
    """
91
    assert len(args) < 3
92

  
93
    if len(args) == 1:
94
      log_type = constants.ELOG_MESSAGE
95
      log_msg = args[0]
96
    else:
97
      log_type, log_msg = args
98

  
99
    self._log_lock.acquire()
100
    try:
101
      # The time is split to make serialization easier and not lose more
102
      # precision.
103
      self.log.append((utils.SplitTime(time.time()), log_type, log_msg))
104
    finally:
105
      self._log_lock.release()
106

  
107
  def RetrieveLog(self, start_at=0):
108
    """Retrieve (a part of) the execution log.
109

  
110
    """
111
    self._log_lock.acquire()
112
    try:
113
      return self.log[start_at:]
114
    finally:
115
      self._log_lock.release()
74
    return {
75
      "input": self.input.__getstate__(),
76
      "status": self.status,
77
      "result": self.result,
78
      "log": self.log,
79
      }
116 80

  
117 81

  
118 82
class _QueuedJob(object):
119 83
  """In-memory job representation.
120 84

  
121
  This is what we use to track the user-submitted jobs.
85
  This is what we use to track the user-submitted jobs. Locking must be taken
86
  care of by users of this class.
122 87

  
123 88
  """
124
  def __new__(cls, *args, **kwargs):
125
    obj = object.__new__(cls, *args, **kwargs)
126
    # Condition to wait for changes
127
    obj.change = threading.Condition()
128
    return obj
129

  
130 89
  def __init__(self, queue, job_id, ops):
131 90
    if not ops:
132 91
      # TODO
......
136 95
    self.id = job_id
137 96
    self.ops = [_QueuedOpCode(op) for op in ops]
138 97
    self.run_op_index = -1
98
    self.log_serial = 0
99

  
100
    # Condition to wait for changes
101
    self.change = threading.Condition(self.queue._lock)
139 102

  
140 103
  @classmethod
141 104
  def Restore(cls, queue, state):
142 105
    obj = _QueuedJob.__new__(cls)
143 106
    obj.queue = queue
144 107
    obj.id = state["id"]
145
    obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
146 108
    obj.run_op_index = state["run_op_index"]
109

  
110
    obj.ops = []
111
    obj.log_serial = 0
112
    for op_state in state["ops"]:
113
      op = _QueuedOpCode.Restore(op_state)
114
      for log_entry in op.log:
115
        obj.log_serial = max(obj.log_serial, log_entry[0])
116
      obj.ops.append(op)
117

  
118
    # Condition to wait for changes
119
    obj.change = threading.Condition(obj.queue._lock)
120

  
147 121
    return obj
148 122

  
149 123
  def Serialize(self):
......
180 154

  
181 155
    return status
182 156

  
157
  def GetLogEntries(self, newer_than):
158
    if newer_than is None:
159
      serial = -1
160
    else:
161
      serial = newer_than
162

  
163
    entries = []
164
    for op in self.ops:
165
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
166

  
167
    return entries
168

  
183 169

  
184 170
class _JobQueueWorker(workerpool.BaseWorker):
185 171
  def RunTask(self, job):
186 172
    """Job executor.
187 173

  
188
    This functions processes a job.
174
    This functions processes a job. It is closely tied to the _QueuedJob and
175
    _QueuedOpCode classes.
189 176

  
190 177
    """
191 178
    logging.debug("Worker %s processing job %s",
......
211 198
              queue.release()
212 199

  
213 200
            def _Log(*args):
214
              op.Log(*args)
201
              """Append a log entry.
202

  
203
              """
204
              assert len(args) < 3
205

  
206
              if len(args) == 1:
207
                log_type = constants.ELOG_MESSAGE
208
                log_msg = args[0]
209
              else:
210
                log_type, log_msg = args
211

  
212
              # The time is split to make serialization easier and not lose
213
              # precision.
214
              timestamp = utils.SplitTime(time.time())
215 215

  
216
              job.change.acquire()
216
              queue.acquire()
217 217
              try:
218
                job.log_serial += 1
219
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
220

  
218 221
                job.change.notifyAll()
219 222
              finally:
220
                job.change.release()
223
                queue.release()
221 224

  
225
            # Make sure not to hold lock while _Log is called
222 226
            result = proc.ExecOpCode(input_opcode, _Log)
223 227

  
224 228
            queue.acquire()
......
532 536
    self._CleanCacheUnlocked([job.id])
533 537

  
534 538
    # Notify waiters about potential changes
535
    job.change.acquire()
536
    try:
537
      job.change.notifyAll()
538
    finally:
539
      job.change.release()
539
    job.change.notifyAll()
540 540

  
541 541
  def _CleanCacheUnlocked(self, exclude):
542 542
    """Clean the memory cache.
......
558 558
        except KeyError:
559 559
          pass
560 560

  
561
  @utils.LockedMethod
561 562
  @_RequireOpenQueue
562
  def WaitForJobChanges(self, job_id, fields, previous):
563
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
564
    """Waits for changes in a job.
565

  
566
    @type job_id: string
567
    @param job_id: Job identifier
568
    @type fields: list of strings
569
    @param fields: Which fields to check for changes
570
    @type prev_job_info: list or None
571
    @param prev_job_info: Last job information returned
572
    @type prev_log_serial: int
573
    @param prev_log_serial: Last job message serial number
574

  
575
    """
563 576
    logging.debug("Waiting for changes in job %s", job_id)
564 577

  
565 578
    while True:
566
      self.acquire()
567
      try:
568
        job = self._LoadJobUnlocked(job_id)
569
        if not job:
570
          logging.debug("Job %s not found", job_id)
571
          new_state = None
572
          break
579
      job = self._LoadJobUnlocked(job_id)
580
      if not job:
581
        logging.debug("Job %s not found", job_id)
582
        new_state = None
583
        break
573 584

  
574
        new_state = self._GetJobInfoUnlocked(job, fields)
575
      finally:
576
        self.release()
585
      status = job.CalcStatus()
586
      job_info = self._GetJobInfoUnlocked(job, fields)
587
      log_entries = job.GetLogEntries(prev_log_serial)
577 588

  
578 589
      # Serializing and deserializing data can cause type changes (e.g. from
579 590
      # tuple to list) or precision loss. We're doing it here so that we get
580 591
      # the same modifications as the data received from the client. Without
581 592
      # this, the comparison afterwards might fail without the data being
582 593
      # significantly different.
583
      new_state = serializer.LoadJson(serializer.DumpJson(new_state))
594
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
595
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
584 596

  
585
      if previous != new_state:
597
      if status not in (constants.JOB_STATUS_QUEUED,
598
                        constants.JOB_STATUS_RUNNING):
599
        # Don't even try to wait if the job is no longer running, there will be
600
        # no changes.
586 601
        break
587 602

  
588
      job.change.acquire()
589
      try:
590
        job.change.wait()
591
      finally:
592
        job.change.release()
603
      if (prev_job_info != job_info or
604
          (log_entries and prev_log_serial != log_entries[0][0])):
605
        break
606

  
607
      logging.debug("Waiting again")
608

  
609
      # Release the queue lock while waiting
610
      job.change.wait()
593 611

  
594 612
    logging.debug("Job %s changed", job_id)
595 613

  
596
    return new_state
614
    return (job_info, log_entries)
597 615

  
598 616
  @utils.LockedMethod
599 617
  @_RequireOpenQueue
......
669 687
        row.append([op.result for op in job.ops])
670 688
      elif fname == "opstatus":
671 689
        row.append([op.status for op in job.ops])
672
      elif fname == "ticker":
673
        ji = job.run_op_index
674
        if ji < 0:
675
          lmsg = None
676
        else:
677
          lmsg = job.ops[ji].RetrieveLog(-1)
678
          # message might be empty here
679
          if lmsg:
680
            lmsg = lmsg[0]
681
          else:
682
            lmsg = None
683
        row.append(lmsg)
684 690
      else:
685 691
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
686 692
    return row
b/lib/luxi.py
289 289
  def ArchiveJob(self, job_id):
290 290
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
291 291

  
292
  def WaitForJobChange(self, job_id, fields, previous):
292
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
293 293
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
294
                           (job_id, fields, previous))
294
                           (job_id, fields, prev_job_info, prev_log_serial))
295 295

  
296 296
  def QueryJobs(self, job_ids, fields):
297 297
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))

Also available in: Unified diff