Revision f1048938

b/lib/cli.py
382 382

  
383 383
  job_id = cl.SubmitJob([op])
384 384

  
385
  lastmsg = None
385 386
  while True:
386
    jobs = cl.QueryJobs([job_id], ["status"])
387
    jobs = cl.QueryJobs([job_id], ["status", "ticker"])
387 388
    if not jobs:
388 389
      # job not found, go away!
389 390
      raise errors.JobLost("Job with id %s lost" % job_id)
......
392 393
    status = jobs[0][0]
393 394
    if status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
394 395
      break
396
    msg = jobs[0][1]
397
    if msg is not None and msg != lastmsg:
398
      print "%s %s" % (time.ctime(msg[0]), msg[2])
399
    lastmsg = msg
395 400
    time.sleep(1)
396 401

  
397 402
  jobs = cl.QueryJobs([job_id], ["status", "opresult"])
b/lib/constants.py
262 262
OP_STATUS_RUNNING = "running"
263 263
OP_STATUS_SUCCESS = "success"
264 264
OP_STATUS_ERROR = "error"
265

  
266
# Execution log types
267
ELOG_MESSAGE = "message"
268
ELOG_PROGRESS = "progress"
b/lib/jqueue.py
26 26
import threading
27 27
import errno
28 28
import re
29
import time
29 30

  
30 31
from ganeti import constants
31 32
from ganeti import serializer
......
44 45

  
45 46
  Access is synchronized by the '_lock' attribute.
46 47

  
48
  The 'log' attribute holds the execution log and consists of tuples
49
  of the form (timestamp, level, message).
50

  
47 51
  """
48 52
  def __init__(self, op):
49
    self.__Setup(op, constants.OP_STATUS_QUEUED, None)
53
    self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
50 54

  
51
  def __Setup(self, input, status, result):
55
  def __Setup(self, input_, status, result, log):
52 56
    self._lock = threading.Lock()
53
    self.input = input
57
    self.input = input_
54 58
    self.status = status
55 59
    self.result = result
60
    self.log = log
56 61

  
57 62
  @classmethod
58 63
  def Restore(cls, state):
59 64
    obj = object.__new__(cls)
60 65
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
61
                state["status"], state["result"])
66
                state["status"], state["result"], state["log"])
62 67
    return obj
63 68

  
64 69
  @utils.LockedMethod
......
67 72
      "input": self.input.__getstate__(),
68 73
      "status": self.status,
69 74
      "result": self.result,
75
      "log": self.log,
70 76
      }
71 77

  
72 78
  @utils.LockedMethod
......
98 104
    """
99 105
    return self.result
100 106

  
107
  @utils.LockedMethod
108
  def Log(self, *args):
109
    """Append a log entry.
110

  
111
    """
112
    assert len(args) < 2
113

  
114
    if len(args) == 1:
115
      log_type = constants.ELOG_MESSAGE
116
      log_msg = args[0]
117
    else:
118
      log_type, log_msg = args
119
    self.log.append((time.time(), log_type, log_msg))
120

  
121
  @utils.LockedMethod
122
  def RetrieveLog(self, start_at=0):
123
    """Retrieve (a part of) the execution log.
124

  
125
    """
126
    return self.log[start_at:]
127

  
101 128

  
102 129
class _QueuedJob(object):
103 130
  """In-memory job representation.
......
110 137
      # TODO
111 138
      raise Exception("No opcodes")
112 139

  
113
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops])
140
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
114 141

  
115
  def __Setup(self, storage, job_id, ops):
142
  def __Setup(self, storage, job_id, ops, run_op_index):
143
    self._lock = threading.Lock()
116 144
    self.storage = storage
117 145
    self.id = job_id
118 146
    self._ops = ops
147
    self.run_op_index = run_op_index
119 148

  
120 149
  @classmethod
121 150
  def Restore(cls, storage, state):
122 151
    obj = object.__new__(cls)
123
    obj.__Setup(storage, state["id"],
124
                [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]])
152
    op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
153
    obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
125 154
    return obj
126 155

  
127 156
  def Serialize(self):
128 157
    return {
129 158
      "id": self.id,
130 159
      "ops": [op.Serialize() for op in self._ops],
160
      "run_op_index": self.run_op_index,
131 161
      }
132 162

  
133 163
  def SetUnclean(self, msg):
......
162 192

  
163 193
    return status
164 194

  
195
  @utils.LockedMethod
196
  def GetRunOpIndex(self):
197
    return self.run_op_index
198

  
165 199
  def Run(self, proc):
166 200
    """Job executor.
167 201

  
......
177 211
      for idx, op in enumerate(self._ops):
178 212
        try:
179 213
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
214

  
215
          self._lock.acquire()
216
          try:
217
            self.run_op_index = idx
218
          finally:
219
            self._lock.release()
220

  
180 221
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
181 222
          self.storage.UpdateJob(self)
182 223

  
183
          result = proc.ExecOpCode(op.input)
224
          result = proc.ExecOpCode(op.input, op.Log)
184 225

  
185 226
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
186 227
          self.storage.UpdateJob(self)
......
207 248
    logging.debug("Worker %s processing job %s",
208 249
                  self.worker_id, job.id)
209 250
    # TODO: feedback function
210
    proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
251
    proc = mcpu.Processor(self.pool.context)
211 252
    try:
212 253
      job.Run(proc)
213 254
    finally:
......
477 518
        row.append([op.GetResult() for op in job._ops])
478 519
      elif fname == "opstatus":
479 520
        row.append([op.GetStatus() for op in job._ops])
521
      elif fname == "ticker":
522
        ji = job.GetRunOpIndex()
523
        if ji < 0:
524
          lmsg = None
525
        else:
526
          lmsg = job._ops[ji].RetrieveLog(-1)
527
          # message might be empty here
528
          if lmsg:
529
            lmsg = lmsg[0]
530
          else:
531
            lmsg = None
532
        row.append(lmsg)
480 533
      else:
481 534
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
482 535
    return row
b/lib/mcpu.py
90 90
    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
91 91
    }
92 92

  
93
  def __init__(self, context, feedback=None):
93
  def __init__(self, context):
94 94
    """Constructor for Processor
95 95

  
96 96
    Args:
......
98 98
                    interesting events are happening
99 99
    """
100 100
    self.context = context
101
    self._feedback_fn = feedback
101
    self._feedback_fn = None
102 102
    self.exclusive_BGL = False
103 103

  
104 104
  def _ExecLU(self, lu):
......
146 146

  
147 147
    return result
148 148

  
149
  def ExecOpCode(self, op):
149
  def ExecOpCode(self, op, feedback_fn):
150 150
    """Execute an opcode.
151 151

  
152 152
    Args:
......
157 157
      raise errors.ProgrammerError("Non-opcode instance passed"
158 158
                                   " to ExecOpcode")
159 159

  
160
    self._feedback_fn = feedback_fn
160 161
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
161 162
    if lu_class is None:
162 163
      raise errors.OpCodeUnknown("Unknown opcode")

Also available in: Unified diff