Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ abc1f2ce

History | View | Annotate | Download (17.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29
import time
30

    
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import workerpool
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import mcpu
37
from ganeti import utils
38
from ganeti import jstore
39
from ganeti import rpc
40

    
41

    
42
JOBQUEUE_THREADS = 5
43

    
44

    
45
class _QueuedOpCode(object):
46
  """Encasulates an opcode object.
47

48
  Access is synchronized by the '_lock' attribute.
49

50
  The 'log' attribute holds the execution log and consists of tuples
51
  of the form (timestamp, level, message).
52

53
  """
54
  def __new__(cls, *args, **kwargs):
55
    obj = object.__new__(cls, *args, **kwargs)
56
    # Create a special lock for logging
57
    obj._log_lock = threading.Lock()
58
    return obj
59

    
60
  def __init__(self, op):
61
    self.input = op
62
    self.status = constants.OP_STATUS_QUEUED
63
    self.result = None
64
    self.log = []
65

    
66
  @classmethod
67
  def Restore(cls, state):
68
    obj = _QueuedOpCode.__new__(cls)
69
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
70
    obj.status = state["status"]
71
    obj.result = state["result"]
72
    obj.log = state["log"]
73
    return obj
74

    
75
  def Serialize(self):
76
    self._log_lock.acquire()
77
    try:
78
      return {
79
        "input": self.input.__getstate__(),
80
        "status": self.status,
81
        "result": self.result,
82
        "log": self.log,
83
        }
84
    finally:
85
      self._log_lock.release()
86

    
87
  def Log(self, *args):
88
    """Append a log entry.
89

90
    """
91
    assert len(args) < 3
92

    
93
    if len(args) == 1:
94
      log_type = constants.ELOG_MESSAGE
95
      log_msg = args[0]
96
    else:
97
      log_type, log_msg = args
98

    
99
    self._log_lock.acquire()
100
    try:
101
      self.log.append((time.time(), log_type, log_msg))
102
    finally:
103
      self._log_lock.release()
104

    
105
  def RetrieveLog(self, start_at=0):
106
    """Retrieve (a part of) the execution log.
107

108
    """
109
    self._log_lock.acquire()
110
    try:
111
      return self.log[start_at:]
112
    finally:
113
      self._log_lock.release()
114

    
115

    
116
class _QueuedJob(object):
117
  """In-memory job representation.
118

119
  This is what we use to track the user-submitted jobs.
120

121
  """
122
  def __init__(self, queue, job_id, ops):
123
    if not ops:
124
      # TODO
125
      raise Exception("No opcodes")
126

    
127
    self.queue = queue
128
    self.id = job_id
129
    self.ops = [_QueuedOpCode(op) for op in ops]
130
    self.run_op_index = -1
131

    
132
  @classmethod
133
  def Restore(cls, queue, state):
134
    obj = _QueuedJob.__new__(cls)
135
    obj.queue = queue
136
    obj.id = state["id"]
137
    obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
138
    obj.run_op_index = state["run_op_index"]
139
    return obj
140

    
141
  def Serialize(self):
142
    return {
143
      "id": self.id,
144
      "ops": [op.Serialize() for op in self.ops],
145
      "run_op_index": self.run_op_index,
146
      }
147

    
148
  def CalcStatus(self):
149
    status = constants.JOB_STATUS_QUEUED
150

    
151
    all_success = True
152
    for op in self.ops:
153
      if op.status == constants.OP_STATUS_SUCCESS:
154
        continue
155

    
156
      all_success = False
157

    
158
      if op.status == constants.OP_STATUS_QUEUED:
159
        pass
160
      elif op.status == constants.OP_STATUS_RUNNING:
161
        status = constants.JOB_STATUS_RUNNING
162
      elif op.status == constants.OP_STATUS_ERROR:
163
        status = constants.JOB_STATUS_ERROR
164
        # The whole job fails if one opcode failed
165
        break
166
      elif op.status == constants.OP_STATUS_CANCELED:
167
        status = constants.OP_STATUS_CANCELED
168
        break
169

    
170
    if all_success:
171
      status = constants.JOB_STATUS_SUCCESS
172

    
173
    return status
174

    
175

    
176
class _JobQueueWorker(workerpool.BaseWorker):
177
  def RunTask(self, job):
178
    """Job executor.
179

180
    This functions processes a job.
181

182
    """
183
    logging.debug("Worker %s processing job %s",
184
                  self.worker_id, job.id)
185
    proc = mcpu.Processor(self.pool.queue.context)
186
    queue = job.queue
187
    try:
188
      try:
189
        count = len(job.ops)
190
        for idx, op in enumerate(job.ops):
191
          try:
192
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
193

    
194
            queue.acquire()
195
            try:
196
              job.run_op_index = idx
197
              op.status = constants.OP_STATUS_RUNNING
198
              op.result = None
199
              queue.UpdateJobUnlocked(job)
200

    
201
              input_opcode = op.input
202
            finally:
203
              queue.release()
204

    
205
            result = proc.ExecOpCode(input_opcode, op.Log)
206

    
207
            queue.acquire()
208
            try:
209
              op.status = constants.OP_STATUS_SUCCESS
210
              op.result = result
211
              queue.UpdateJobUnlocked(job)
212
            finally:
213
              queue.release()
214

    
215
            logging.debug("Op %s/%s: Successfully finished %s",
216
                          idx + 1, count, op)
217
          except Exception, err:
218
            queue.acquire()
219
            try:
220
              try:
221
                op.status = constants.OP_STATUS_ERROR
222
                op.result = str(err)
223
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
224
              finally:
225
                queue.UpdateJobUnlocked(job)
226
            finally:
227
              queue.release()
228
            raise
229

    
230
      except errors.GenericError, err:
231
        logging.exception("Ganeti exception")
232
      except:
233
        logging.exception("Unhandled exception")
234
    finally:
235
      queue.acquire()
236
      try:
237
        job_id = job.id
238
        status = job.CalcStatus()
239
      finally:
240
        queue.release()
241
      logging.debug("Worker %s finished job %s, status = %s",
242
                    self.worker_id, job_id, status)
243

    
244

    
245
class _JobQueueWorkerPool(workerpool.WorkerPool):
246
  def __init__(self, queue):
247
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
248
                                              _JobQueueWorker)
249
    self.queue = queue
250

    
251

    
252
class JobQueue(object):
253
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
254

    
255
  def _RequireOpenQueue(fn):
256
    """Decorator for "public" functions.
257

258
    This function should be used for all "public" functions. That is, functions
259
    usually called from other classes.
260

261
    Important: Use this decorator only after utils.LockedMethod!
262

263
    Example:
264
      @utils.LockedMethod
265
      @_RequireOpenQueue
266
      def Example(self):
267
        pass
268

269
    """
270
    def wrapper(self, *args, **kwargs):
271
      assert self._queue_lock is not None, "Queue should be open"
272
      return fn(self, *args, **kwargs)
273
    return wrapper
274

    
275
  def __init__(self, context):
276
    self.context = context
277
    self._memcache = {}
278
    self._my_hostname = utils.HostInfo().name
279

    
280
    # Locking
281
    self._lock = threading.Lock()
282
    self.acquire = self._lock.acquire
283
    self.release = self._lock.release
284

    
285
    # Initialize
286
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
287

    
288
    # Read serial file
289
    self._last_serial = jstore.ReadSerial()
290
    assert self._last_serial is not None, ("Serial file was modified between"
291
                                           " check in jstore and here")
292

    
293
    # Get initial list of nodes
294
    self._nodes = set(self.context.cfg.GetNodeList())
295

    
296
    # Remove master node
297
    try:
298
      self._nodes.remove(self._my_hostname)
299
    except ValueError:
300
      pass
301

    
302
    # TODO: Check consistency across nodes
303

    
304
    # Setup worker pool
305
    self._wpool = _JobQueueWorkerPool(self)
306

    
307
    # We need to lock here because WorkerPool.AddTask() may start a job while
308
    # we're still doing our work.
309
    self.acquire()
310
    try:
311
      for job in self._GetJobsUnlocked(None):
312
        status = job.CalcStatus()
313

    
314
        if status in (constants.JOB_STATUS_QUEUED, ):
315
          self._wpool.AddTask(job)
316

    
317
        elif status in (constants.JOB_STATUS_RUNNING, ):
318
          logging.warning("Unfinished job %s found: %s", job.id, job)
319
          try:
320
            for op in job.ops:
321
              op.status = constants.OP_STATUS_ERROR
322
              op.result = "Unclean master daemon shutdown"
323
          finally:
324
            self.UpdateJobUnlocked(job)
325
    finally:
326
      self.release()
327

    
328
  @utils.LockedMethod
329
  @_RequireOpenQueue
330
  def AddNode(self, node_name):
331
    assert node_name != self._my_hostname
332

    
333
    # Clean queue directory on added node
334
    rpc.call_jobqueue_purge(node_name)
335

    
336
    # Upload the whole queue excluding archived jobs
337
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
338

    
339
    # Upload current serial file
340
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
341

    
342
    for file_name in files:
343
      # Read file content
344
      fd = open(file_name, "r")
345
      try:
346
        content = fd.read()
347
      finally:
348
        fd.close()
349

    
350
      result = rpc.call_jobqueue_update([node_name], file_name, content)
351
      if not result[node_name]:
352
        logging.error("Failed to upload %s to %s", file_name, node_name)
353

    
354
    self._nodes.add(node_name)
355

    
356
  @utils.LockedMethod
357
  @_RequireOpenQueue
358
  def RemoveNode(self, node_name):
359
    try:
360
      # The queue is removed by the "leave node" RPC call.
361
      self._nodes.remove(node_name)
362
    except KeyError:
363
      pass
364

    
365
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
366
    """Writes a file locally and then replicates it to all nodes.
367

368
    """
369
    utils.WriteFile(file_name, data=data)
370

    
371
    failed_nodes = 0
372
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
373
    for node in self._nodes:
374
      if not result[node]:
375
        failed_nodes += 1
376
        logging.error("Copy of job queue file to node %s failed", node)
377

    
378
    # TODO: check failed_nodes
379

    
380
  def _RenameFileUnlocked(self, old, new):
381
    os.rename(old, new)
382

    
383
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
384
    for node in self._nodes:
385
      if not result[node]:
386
        logging.error("Moving %s to %s failed on %s", old, new, node)
387

    
388
    # TODO: check failed nodes
389

    
390
  def _FormatJobID(self, job_id):
391
    if not isinstance(job_id, (int, long)):
392
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
393
    if job_id < 0:
394
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
395

    
396
    return str(job_id)
397

    
398
  def _NewSerialUnlocked(self):
399
    """Generates a new job identifier.
400

401
    Job identifiers are unique during the lifetime of a cluster.
402

403
    Returns: A string representing the job identifier.
404

405
    """
406
    # New number
407
    serial = self._last_serial + 1
408

    
409
    # Write to file
410
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
411
                                        "%s\n" % serial)
412

    
413
    # Keep it only if we were able to write the file
414
    self._last_serial = serial
415

    
416
    return self._FormatJobID(serial)
417

    
418
  @staticmethod
419
  def _GetJobPath(job_id):
420
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
421

    
422
  @staticmethod
423
  def _GetArchivedJobPath(job_id):
424
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
425

    
426
  @classmethod
427
  def _ExtractJobID(cls, name):
428
    m = cls._RE_JOB_FILE.match(name)
429
    if m:
430
      return m.group(1)
431
    else:
432
      return None
433

    
434
  def _GetJobIDsUnlocked(self, archived=False):
435
    """Return all known job IDs.
436

437
    If the parameter archived is True, archived jobs IDs will be
438
    included. Currently this argument is unused.
439

440
    The method only looks at disk because it's a requirement that all
441
    jobs are present on disk (so in the _memcache we don't have any
442
    extra IDs).
443

444
    """
445
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
446
    jlist.sort()
447
    return jlist
448

    
449
  def _ListJobFiles(self):
450
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
451
            if self._RE_JOB_FILE.match(name)]
452

    
453
  def _LoadJobUnlocked(self, job_id):
454
    if job_id in self._memcache:
455
      logging.debug("Found job %s in memcache", job_id)
456
      return self._memcache[job_id]
457

    
458
    filepath = self._GetJobPath(job_id)
459
    logging.debug("Loading job from %s", filepath)
460
    try:
461
      fd = open(filepath, "r")
462
    except IOError, err:
463
      if err.errno in (errno.ENOENT, ):
464
        return None
465
      raise
466
    try:
467
      data = serializer.LoadJson(fd.read())
468
    finally:
469
      fd.close()
470

    
471
    job = _QueuedJob.Restore(self, data)
472
    self._memcache[job_id] = job
473
    logging.debug("Added job %s to the cache", job_id)
474
    return job
475

    
476
  def _GetJobsUnlocked(self, job_ids):
477
    if not job_ids:
478
      job_ids = self._GetJobIDsUnlocked()
479

    
480
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
481

    
482
  @utils.LockedMethod
483
  @_RequireOpenQueue
484
  def SubmitJob(self, ops):
485
    """Create and store a new job.
486

487
    This enters the job into our job queue and also puts it on the new
488
    queue, in order for it to be picked up by the queue processors.
489

490
    @type ops: list
491
    @param ops: The list of OpCodes that will become the new job.
492

493
    """
494
    # Get job identifier
495
    job_id = self._NewSerialUnlocked()
496
    job = _QueuedJob(self, job_id, ops)
497

    
498
    # Write to disk
499
    self.UpdateJobUnlocked(job)
500

    
501
    logging.debug("Added new job %s to the cache", job_id)
502
    self._memcache[job_id] = job
503

    
504
    # Add to worker pool
505
    self._wpool.AddTask(job)
506

    
507
    return job.id
508

    
509
  @_RequireOpenQueue
510
  def UpdateJobUnlocked(self, job):
511
    filename = self._GetJobPath(job.id)
512
    data = serializer.DumpJson(job.Serialize(), indent=False)
513
    logging.debug("Writing job %s to %s", job.id, filename)
514
    self._WriteAndReplicateFileUnlocked(filename, data)
515
    self._CleanCacheUnlocked([job.id])
516

    
517
  def _CleanCacheUnlocked(self, exclude):
518
    """Clean the memory cache.
519

520
    The exceptions argument contains job IDs that should not be
521
    cleaned.
522

523
    """
524
    assert isinstance(exclude, list)
525

    
526
    for job in self._memcache.values():
527
      if job.id in exclude:
528
        continue
529
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
530
                                  constants.JOB_STATUS_RUNNING):
531
        logging.debug("Cleaning job %s from the cache", job.id)
532
        try:
533
          del self._memcache[job.id]
534
        except KeyError:
535
          pass
536

    
537
  @utils.LockedMethod
538
  @_RequireOpenQueue
539
  def CancelJob(self, job_id):
540
    """Cancels a job.
541

542
    @type job_id: string
543
    @param job_id: Job ID of job to be cancelled.
544

545
    """
546
    logging.debug("Cancelling job %s", job_id)
547

    
548
    job = self._LoadJobUnlocked(job_id)
549
    if not job:
550
      logging.debug("Job %s not found", job_id)
551
      return
552

    
553
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
554
      logging.debug("Job %s is no longer in the queue", job.id)
555
      return
556

    
557
    try:
558
      for op in job.ops:
559
        op.status = constants.OP_STATUS_ERROR
560
        op.result = "Job cancelled by request"
561
    finally:
562
      self.UpdateJobUnlocked(job)
563

    
564
  @utils.LockedMethod
565
  @_RequireOpenQueue
566
  def ArchiveJob(self, job_id):
567
    """Archives a job.
568

569
    @type job_id: string
570
    @param job_id: Job ID of job to be archived.
571

572
    """
573
    logging.debug("Archiving job %s", job_id)
574

    
575
    job = self._LoadJobUnlocked(job_id)
576
    if not job:
577
      logging.debug("Job %s not found", job_id)
578
      return
579

    
580
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
581
                                constants.JOB_STATUS_SUCCESS,
582
                                constants.JOB_STATUS_ERROR):
583
      logging.debug("Job %s is not yet done", job.id)
584
      return
585

    
586
    try:
587
      old = self._GetJobPath(job.id)
588
      new = self._GetArchivedJobPath(job.id)
589

    
590
      self._RenameFileUnlocked(old, new)
591

    
592
      logging.debug("Successfully archived job %s", job.id)
593
    finally:
594
      # Cleaning the cache because we don't know what os.rename actually did
595
      # and to be on the safe side.
596
      self._CleanCacheUnlocked([])
597

    
598
  def _GetJobInfoUnlocked(self, job, fields):
599
    row = []
600
    for fname in fields:
601
      if fname == "id":
602
        row.append(job.id)
603
      elif fname == "status":
604
        row.append(job.CalcStatus())
605
      elif fname == "ops":
606
        row.append([op.input.__getstate__() for op in job.ops])
607
      elif fname == "opresult":
608
        row.append([op.result for op in job.ops])
609
      elif fname == "opstatus":
610
        row.append([op.status for op in job.ops])
611
      elif fname == "ticker":
612
        ji = job.run_op_index
613
        if ji < 0:
614
          lmsg = None
615
        else:
616
          lmsg = job.ops[ji].RetrieveLog(-1)
617
          # message might be empty here
618
          if lmsg:
619
            lmsg = lmsg[0]
620
          else:
621
            lmsg = None
622
        row.append(lmsg)
623
      else:
624
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
625
    return row
626

    
627
  @utils.LockedMethod
628
  @_RequireOpenQueue
629
  def QueryJobs(self, job_ids, fields):
630
    """Returns a list of jobs in queue.
631

632
    Args:
633
    - job_ids: Sequence of job identifiers or None for all
634
    - fields: Names of fields to return
635

636
    """
637
    jobs = []
638

    
639
    for job in self._GetJobsUnlocked(job_ids):
640
      if job is None:
641
        jobs.append(None)
642
      else:
643
        jobs.append(self._GetJobInfoUnlocked(job, fields))
644

    
645
    return jobs
646

    
647
  @utils.LockedMethod
648
  @_RequireOpenQueue
649
  def Shutdown(self):
650
    """Stops the job queue.
651

652
    """
653
    self._wpool.TerminateWorkers()
654

    
655
    self._queue_lock.Close()
656
    self._queue_lock = None