Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ dfe57c22

History | View | Annotate | Download (18.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29
import time
30

    
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import workerpool
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import mcpu
37
from ganeti import utils
38
from ganeti import jstore
39
from ganeti import rpc
40

    
41

    
42
JOBQUEUE_THREADS = 5
43

    
44

    
45
class _QueuedOpCode(object):
46
  """Encasulates an opcode object.
47

48
  Access is synchronized by the '_lock' attribute.
49

50
  The 'log' attribute holds the execution log and consists of tuples
51
  of the form (timestamp, level, message).
52

53
  """
54
  def __new__(cls, *args, **kwargs):
55
    obj = object.__new__(cls, *args, **kwargs)
56
    # Create a special lock for logging
57
    obj._log_lock = threading.Lock()
58
    return obj
59

    
60
  def __init__(self, op):
61
    self.input = op
62
    self.status = constants.OP_STATUS_QUEUED
63
    self.result = None
64
    self.log = []
65

    
66
  @classmethod
67
  def Restore(cls, state):
68
    obj = _QueuedOpCode.__new__(cls)
69
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
70
    obj.status = state["status"]
71
    obj.result = state["result"]
72
    obj.log = state["log"]
73
    return obj
74

    
75
  def Serialize(self):
76
    self._log_lock.acquire()
77
    try:
78
      return {
79
        "input": self.input.__getstate__(),
80
        "status": self.status,
81
        "result": self.result,
82
        "log": self.log,
83
        }
84
    finally:
85
      self._log_lock.release()
86

    
87
  def Log(self, *args):
88
    """Append a log entry.
89

90
    """
91
    assert len(args) < 3
92

    
93
    if len(args) == 1:
94
      log_type = constants.ELOG_MESSAGE
95
      log_msg = args[0]
96
    else:
97
      log_type, log_msg = args
98

    
99
    self._log_lock.acquire()
100
    try:
101
      # The time is split to make serialization easier and not lose more
102
      # precision.
103
      self.log.append((utils.SplitTime(time.time()), log_type, log_msg))
104
    finally:
105
      self._log_lock.release()
106

    
107
  def RetrieveLog(self, start_at=0):
108
    """Retrieve (a part of) the execution log.
109

110
    """
111
    self._log_lock.acquire()
112
    try:
113
      return self.log[start_at:]
114
    finally:
115
      self._log_lock.release()
116

    
117

    
118
class _QueuedJob(object):
119
  """In-memory job representation.
120

121
  This is what we use to track the user-submitted jobs.
122

123
  """
124
  def __new__(cls, *args, **kwargs):
125
    obj = object.__new__(cls, *args, **kwargs)
126
    # Condition to wait for changes
127
    obj.change = threading.Condition()
128
    return obj
129

    
130
  def __init__(self, queue, job_id, ops):
131
    if not ops:
132
      # TODO
133
      raise Exception("No opcodes")
134

    
135
    self.queue = queue
136
    self.id = job_id
137
    self.ops = [_QueuedOpCode(op) for op in ops]
138
    self.run_op_index = -1
139

    
140
  @classmethod
141
  def Restore(cls, queue, state):
142
    obj = _QueuedJob.__new__(cls)
143
    obj.queue = queue
144
    obj.id = state["id"]
145
    obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
146
    obj.run_op_index = state["run_op_index"]
147
    return obj
148

    
149
  def Serialize(self):
150
    return {
151
      "id": self.id,
152
      "ops": [op.Serialize() for op in self.ops],
153
      "run_op_index": self.run_op_index,
154
      }
155

    
156
  def CalcStatus(self):
157
    status = constants.JOB_STATUS_QUEUED
158

    
159
    all_success = True
160
    for op in self.ops:
161
      if op.status == constants.OP_STATUS_SUCCESS:
162
        continue
163

    
164
      all_success = False
165

    
166
      if op.status == constants.OP_STATUS_QUEUED:
167
        pass
168
      elif op.status == constants.OP_STATUS_RUNNING:
169
        status = constants.JOB_STATUS_RUNNING
170
      elif op.status == constants.OP_STATUS_ERROR:
171
        status = constants.JOB_STATUS_ERROR
172
        # The whole job fails if one opcode failed
173
        break
174
      elif op.status == constants.OP_STATUS_CANCELED:
175
        status = constants.OP_STATUS_CANCELED
176
        break
177

    
178
    if all_success:
179
      status = constants.JOB_STATUS_SUCCESS
180

    
181
    return status
182

    
183

    
184
class _JobQueueWorker(workerpool.BaseWorker):
185
  def RunTask(self, job):
186
    """Job executor.
187

188
    This functions processes a job.
189

190
    """
191
    logging.debug("Worker %s processing job %s",
192
                  self.worker_id, job.id)
193
    proc = mcpu.Processor(self.pool.queue.context)
194
    queue = job.queue
195
    try:
196
      try:
197
        count = len(job.ops)
198
        for idx, op in enumerate(job.ops):
199
          try:
200
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
201

    
202
            queue.acquire()
203
            try:
204
              job.run_op_index = idx
205
              op.status = constants.OP_STATUS_RUNNING
206
              op.result = None
207
              queue.UpdateJobUnlocked(job)
208

    
209
              input_opcode = op.input
210
            finally:
211
              queue.release()
212

    
213
            def _Log(*args):
214
              op.Log(*args)
215

    
216
              job.change.acquire()
217
              try:
218
                job.change.notifyAll()
219
              finally:
220
                job.change.release()
221

    
222
            result = proc.ExecOpCode(input_opcode, _Log)
223

    
224
            queue.acquire()
225
            try:
226
              op.status = constants.OP_STATUS_SUCCESS
227
              op.result = result
228
              queue.UpdateJobUnlocked(job)
229
            finally:
230
              queue.release()
231

    
232
            logging.debug("Op %s/%s: Successfully finished %s",
233
                          idx + 1, count, op)
234
          except Exception, err:
235
            queue.acquire()
236
            try:
237
              try:
238
                op.status = constants.OP_STATUS_ERROR
239
                op.result = str(err)
240
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
241
              finally:
242
                queue.UpdateJobUnlocked(job)
243
            finally:
244
              queue.release()
245
            raise
246

    
247
      except errors.GenericError, err:
248
        logging.exception("Ganeti exception")
249
      except:
250
        logging.exception("Unhandled exception")
251
    finally:
252
      queue.acquire()
253
      try:
254
        job_id = job.id
255
        status = job.CalcStatus()
256
      finally:
257
        queue.release()
258
      logging.debug("Worker %s finished job %s, status = %s",
259
                    self.worker_id, job_id, status)
260

    
261

    
262
class _JobQueueWorkerPool(workerpool.WorkerPool):
263
  def __init__(self, queue):
264
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
265
                                              _JobQueueWorker)
266
    self.queue = queue
267

    
268

    
269
class JobQueue(object):
270
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
271

    
272
  def _RequireOpenQueue(fn):
273
    """Decorator for "public" functions.
274

275
    This function should be used for all "public" functions. That is, functions
276
    usually called from other classes.
277

278
    Important: Use this decorator only after utils.LockedMethod!
279

280
    Example:
281
      @utils.LockedMethod
282
      @_RequireOpenQueue
283
      def Example(self):
284
        pass
285

286
    """
287
    def wrapper(self, *args, **kwargs):
288
      assert self._queue_lock is not None, "Queue should be open"
289
      return fn(self, *args, **kwargs)
290
    return wrapper
291

    
292
  def __init__(self, context):
293
    self.context = context
294
    self._memcache = {}
295
    self._my_hostname = utils.HostInfo().name
296

    
297
    # Locking
298
    self._lock = threading.Lock()
299
    self.acquire = self._lock.acquire
300
    self.release = self._lock.release
301

    
302
    # Initialize
303
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
304

    
305
    # Read serial file
306
    self._last_serial = jstore.ReadSerial()
307
    assert self._last_serial is not None, ("Serial file was modified between"
308
                                           " check in jstore and here")
309

    
310
    # Get initial list of nodes
311
    self._nodes = set(self.context.cfg.GetNodeList())
312

    
313
    # Remove master node
314
    try:
315
      self._nodes.remove(self._my_hostname)
316
    except ValueError:
317
      pass
318

    
319
    # TODO: Check consistency across nodes
320

    
321
    # Setup worker pool
322
    self._wpool = _JobQueueWorkerPool(self)
323

    
324
    # We need to lock here because WorkerPool.AddTask() may start a job while
325
    # we're still doing our work.
326
    self.acquire()
327
    try:
328
      for job in self._GetJobsUnlocked(None):
329
        status = job.CalcStatus()
330

    
331
        if status in (constants.JOB_STATUS_QUEUED, ):
332
          self._wpool.AddTask(job)
333

    
334
        elif status in (constants.JOB_STATUS_RUNNING, ):
335
          logging.warning("Unfinished job %s found: %s", job.id, job)
336
          try:
337
            for op in job.ops:
338
              op.status = constants.OP_STATUS_ERROR
339
              op.result = "Unclean master daemon shutdown"
340
          finally:
341
            self.UpdateJobUnlocked(job)
342
    finally:
343
      self.release()
344

    
345
  @utils.LockedMethod
346
  @_RequireOpenQueue
347
  def AddNode(self, node_name):
348
    assert node_name != self._my_hostname
349

    
350
    # Clean queue directory on added node
351
    rpc.call_jobqueue_purge(node_name)
352

    
353
    # Upload the whole queue excluding archived jobs
354
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
355

    
356
    # Upload current serial file
357
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
358

    
359
    for file_name in files:
360
      # Read file content
361
      fd = open(file_name, "r")
362
      try:
363
        content = fd.read()
364
      finally:
365
        fd.close()
366

    
367
      result = rpc.call_jobqueue_update([node_name], file_name, content)
368
      if not result[node_name]:
369
        logging.error("Failed to upload %s to %s", file_name, node_name)
370

    
371
    self._nodes.add(node_name)
372

    
373
  @utils.LockedMethod
374
  @_RequireOpenQueue
375
  def RemoveNode(self, node_name):
376
    try:
377
      # The queue is removed by the "leave node" RPC call.
378
      self._nodes.remove(node_name)
379
    except KeyError:
380
      pass
381

    
382
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
383
    """Writes a file locally and then replicates it to all nodes.
384

385
    """
386
    utils.WriteFile(file_name, data=data)
387

    
388
    failed_nodes = 0
389
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
390
    for node in self._nodes:
391
      if not result[node]:
392
        failed_nodes += 1
393
        logging.error("Copy of job queue file to node %s failed", node)
394

    
395
    # TODO: check failed_nodes
396

    
397
  def _RenameFileUnlocked(self, old, new):
398
    os.rename(old, new)
399

    
400
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
401
    for node in self._nodes:
402
      if not result[node]:
403
        logging.error("Moving %s to %s failed on %s", old, new, node)
404

    
405
    # TODO: check failed nodes
406

    
407
  def _FormatJobID(self, job_id):
408
    if not isinstance(job_id, (int, long)):
409
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
410
    if job_id < 0:
411
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
412

    
413
    return str(job_id)
414

    
415
  def _NewSerialUnlocked(self):
416
    """Generates a new job identifier.
417

418
    Job identifiers are unique during the lifetime of a cluster.
419

420
    Returns: A string representing the job identifier.
421

422
    """
423
    # New number
424
    serial = self._last_serial + 1
425

    
426
    # Write to file
427
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
428
                                        "%s\n" % serial)
429

    
430
    # Keep it only if we were able to write the file
431
    self._last_serial = serial
432

    
433
    return self._FormatJobID(serial)
434

    
435
  @staticmethod
436
  def _GetJobPath(job_id):
437
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
438

    
439
  @staticmethod
440
  def _GetArchivedJobPath(job_id):
441
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
442

    
443
  @classmethod
444
  def _ExtractJobID(cls, name):
445
    m = cls._RE_JOB_FILE.match(name)
446
    if m:
447
      return m.group(1)
448
    else:
449
      return None
450

    
451
  def _GetJobIDsUnlocked(self, archived=False):
452
    """Return all known job IDs.
453

454
    If the parameter archived is True, archived jobs IDs will be
455
    included. Currently this argument is unused.
456

457
    The method only looks at disk because it's a requirement that all
458
    jobs are present on disk (so in the _memcache we don't have any
459
    extra IDs).
460

461
    """
462
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
463
    jlist.sort()
464
    return jlist
465

    
466
  def _ListJobFiles(self):
467
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
468
            if self._RE_JOB_FILE.match(name)]
469

    
470
  def _LoadJobUnlocked(self, job_id):
471
    if job_id in self._memcache:
472
      logging.debug("Found job %s in memcache", job_id)
473
      return self._memcache[job_id]
474

    
475
    filepath = self._GetJobPath(job_id)
476
    logging.debug("Loading job from %s", filepath)
477
    try:
478
      fd = open(filepath, "r")
479
    except IOError, err:
480
      if err.errno in (errno.ENOENT, ):
481
        return None
482
      raise
483
    try:
484
      data = serializer.LoadJson(fd.read())
485
    finally:
486
      fd.close()
487

    
488
    job = _QueuedJob.Restore(self, data)
489
    self._memcache[job_id] = job
490
    logging.debug("Added job %s to the cache", job_id)
491
    return job
492

    
493
  def _GetJobsUnlocked(self, job_ids):
494
    if not job_ids:
495
      job_ids = self._GetJobIDsUnlocked()
496

    
497
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
498

    
499
  @utils.LockedMethod
500
  @_RequireOpenQueue
501
  def SubmitJob(self, ops):
502
    """Create and store a new job.
503

504
    This enters the job into our job queue and also puts it on the new
505
    queue, in order for it to be picked up by the queue processors.
506

507
    @type ops: list
508
    @param ops: The list of OpCodes that will become the new job.
509

510
    """
511
    # Get job identifier
512
    job_id = self._NewSerialUnlocked()
513
    job = _QueuedJob(self, job_id, ops)
514

    
515
    # Write to disk
516
    self.UpdateJobUnlocked(job)
517

    
518
    logging.debug("Added new job %s to the cache", job_id)
519
    self._memcache[job_id] = job
520

    
521
    # Add to worker pool
522
    self._wpool.AddTask(job)
523

    
524
    return job.id
525

    
526
  @_RequireOpenQueue
527
  def UpdateJobUnlocked(self, job):
528
    filename = self._GetJobPath(job.id)
529
    data = serializer.DumpJson(job.Serialize(), indent=False)
530
    logging.debug("Writing job %s to %s", job.id, filename)
531
    self._WriteAndReplicateFileUnlocked(filename, data)
532
    self._CleanCacheUnlocked([job.id])
533

    
534
    # Notify waiters about potential changes
535
    job.change.acquire()
536
    try:
537
      job.change.notifyAll()
538
    finally:
539
      job.change.release()
540

    
541
  def _CleanCacheUnlocked(self, exclude):
542
    """Clean the memory cache.
543

544
    The exceptions argument contains job IDs that should not be
545
    cleaned.
546

547
    """
548
    assert isinstance(exclude, list)
549

    
550
    for job in self._memcache.values():
551
      if job.id in exclude:
552
        continue
553
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
554
                                  constants.JOB_STATUS_RUNNING):
555
        logging.debug("Cleaning job %s from the cache", job.id)
556
        try:
557
          del self._memcache[job.id]
558
        except KeyError:
559
          pass
560

    
561
  @_RequireOpenQueue
562
  def WaitForJobChanges(self, job_id, fields, previous):
563
    logging.debug("Waiting for changes in job %s", job_id)
564

    
565
    while True:
566
      self.acquire()
567
      try:
568
        job = self._LoadJobUnlocked(job_id)
569
        if not job:
570
          logging.debug("Job %s not found", job_id)
571
          new_state = None
572
          break
573

    
574
        new_state = self._GetJobInfoUnlocked(job, fields)
575
      finally:
576
        self.release()
577

    
578
      # Serializing and deserializing data can cause type changes (e.g. from
579
      # tuple to list) or precision loss. We're doing it here so that we get
580
      # the same modifications as the data received from the client. Without
581
      # this, the comparison afterwards might fail without the data being
582
      # significantly different.
583
      new_state = serializer.LoadJson(serializer.DumpJson(new_state))
584

    
585
      if previous != new_state:
586
        break
587

    
588
      job.change.acquire()
589
      try:
590
        job.change.wait()
591
      finally:
592
        job.change.release()
593

    
594
    logging.debug("Job %s changed", job_id)
595

    
596
    return new_state
597

    
598
  @utils.LockedMethod
599
  @_RequireOpenQueue
600
  def CancelJob(self, job_id):
601
    """Cancels a job.
602

603
    @type job_id: string
604
    @param job_id: Job ID of job to be cancelled.
605

606
    """
607
    logging.debug("Cancelling job %s", job_id)
608

    
609
    job = self._LoadJobUnlocked(job_id)
610
    if not job:
611
      logging.debug("Job %s not found", job_id)
612
      return
613

    
614
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
615
      logging.debug("Job %s is no longer in the queue", job.id)
616
      return
617

    
618
    try:
619
      for op in job.ops:
620
        op.status = constants.OP_STATUS_ERROR
621
        op.result = "Job cancelled by request"
622
    finally:
623
      self.UpdateJobUnlocked(job)
624

    
625
  @utils.LockedMethod
626
  @_RequireOpenQueue
627
  def ArchiveJob(self, job_id):
628
    """Archives a job.
629

630
    @type job_id: string
631
    @param job_id: Job ID of job to be archived.
632

633
    """
634
    logging.debug("Archiving job %s", job_id)
635

    
636
    job = self._LoadJobUnlocked(job_id)
637
    if not job:
638
      logging.debug("Job %s not found", job_id)
639
      return
640

    
641
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
642
                                constants.JOB_STATUS_SUCCESS,
643
                                constants.JOB_STATUS_ERROR):
644
      logging.debug("Job %s is not yet done", job.id)
645
      return
646

    
647
    try:
648
      old = self._GetJobPath(job.id)
649
      new = self._GetArchivedJobPath(job.id)
650

    
651
      self._RenameFileUnlocked(old, new)
652

    
653
      logging.debug("Successfully archived job %s", job.id)
654
    finally:
655
      # Cleaning the cache because we don't know what os.rename actually did
656
      # and to be on the safe side.
657
      self._CleanCacheUnlocked([])
658

    
659
  def _GetJobInfoUnlocked(self, job, fields):
660
    row = []
661
    for fname in fields:
662
      if fname == "id":
663
        row.append(job.id)
664
      elif fname == "status":
665
        row.append(job.CalcStatus())
666
      elif fname == "ops":
667
        row.append([op.input.__getstate__() for op in job.ops])
668
      elif fname == "opresult":
669
        row.append([op.result for op in job.ops])
670
      elif fname == "opstatus":
671
        row.append([op.status for op in job.ops])
672
      elif fname == "ticker":
673
        ji = job.run_op_index
674
        if ji < 0:
675
          lmsg = None
676
        else:
677
          lmsg = job.ops[ji].RetrieveLog(-1)
678
          # message might be empty here
679
          if lmsg:
680
            lmsg = lmsg[0]
681
          else:
682
            lmsg = None
683
        row.append(lmsg)
684
      else:
685
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
686
    return row
687

    
688
  @utils.LockedMethod
689
  @_RequireOpenQueue
690
  def QueryJobs(self, job_ids, fields):
691
    """Returns a list of jobs in queue.
692

693
    Args:
694
    - job_ids: Sequence of job identifiers or None for all
695
    - fields: Names of fields to return
696

697
    """
698
    jobs = []
699

    
700
    for job in self._GetJobsUnlocked(job_ids):
701
      if job is None:
702
        jobs.append(None)
703
      else:
704
        jobs.append(self._GetJobInfoUnlocked(job, fields))
705

    
706
    return jobs
707

    
708
  @utils.LockedMethod
709
  @_RequireOpenQueue
710
  def Shutdown(self):
711
    """Stops the job queue.
712

713
    """
714
    self._wpool.TerminateWorkers()
715

    
716
    self._queue_lock.Close()
717
    self._queue_lock = None