Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ db37da70

History | View | Annotate | Download (17.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29
import time
30

    
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import workerpool
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import mcpu
37
from ganeti import utils
38
from ganeti import rpc
39

    
40

    
41
JOBQUEUE_THREADS = 5
42

    
43

    
44
class _QueuedOpCode(object):
45
  """Encasulates an opcode object.
46

47
  Access is synchronized by the '_lock' attribute.
48

49
  The 'log' attribute holds the execution log and consists of tuples
50
  of the form (timestamp, level, message).
51

52
  """
53
  def __new__(cls, *args, **kwargs):
54
    obj = object.__new__(cls, *args, **kwargs)
55
    # Create a special lock for logging
56
    obj._log_lock = threading.Lock()
57
    return obj
58

    
59
  def __init__(self, op):
60
    self.input = op
61
    self.status = constants.OP_STATUS_QUEUED
62
    self.result = None
63
    self.log = []
64

    
65
  @classmethod
66
  def Restore(cls, state):
67
    obj = _QueuedOpCode.__new__(cls)
68
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
69
    obj.status = state["status"]
70
    obj.result = state["result"]
71
    obj.log = state["log"]
72
    return obj
73

    
74
  def Serialize(self):
75
    self._log_lock.acquire()
76
    try:
77
      return {
78
        "input": self.input.__getstate__(),
79
        "status": self.status,
80
        "result": self.result,
81
        "log": self.log,
82
        }
83
    finally:
84
      self._log_lock.release()
85

    
86
  def Log(self, *args):
87
    """Append a log entry.
88

89
    """
90
    assert len(args) < 3
91

    
92
    if len(args) == 1:
93
      log_type = constants.ELOG_MESSAGE
94
      log_msg = args[0]
95
    else:
96
      log_type, log_msg = args
97

    
98
    self._log_lock.acquire()
99
    try:
100
      self.log.append((time.time(), log_type, log_msg))
101
    finally:
102
      self._log_lock.release()
103

    
104
  def RetrieveLog(self, start_at=0):
105
    """Retrieve (a part of) the execution log.
106

107
    """
108
    self._log_lock.acquire()
109
    try:
110
      return self.log[start_at:]
111
    finally:
112
      self._log_lock.release()
113

    
114

    
115
class _QueuedJob(object):
116
  """In-memory job representation.
117

118
  This is what we use to track the user-submitted jobs.
119

120
  """
121
  def __init__(self, queue, job_id, ops):
122
    if not ops:
123
      # TODO
124
      raise Exception("No opcodes")
125

    
126
    self.queue = queue
127
    self.id = job_id
128
    self.ops = [_QueuedOpCode(op) for op in ops]
129
    self.run_op_index = -1
130

    
131
  @classmethod
132
  def Restore(cls, queue, state):
133
    obj = _QueuedJob.__new__(cls)
134
    obj.queue = queue
135
    obj.id = state["id"]
136
    obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
137
    obj.run_op_index = state["run_op_index"]
138
    return obj
139

    
140
  def Serialize(self):
141
    return {
142
      "id": self.id,
143
      "ops": [op.Serialize() for op in self.ops],
144
      "run_op_index": self.run_op_index,
145
      }
146

    
147
  def CalcStatus(self):
148
    status = constants.JOB_STATUS_QUEUED
149

    
150
    all_success = True
151
    for op in self.ops:
152
      if op.status == constants.OP_STATUS_SUCCESS:
153
        continue
154

    
155
      all_success = False
156

    
157
      if op.status == constants.OP_STATUS_QUEUED:
158
        pass
159
      elif op.status == constants.OP_STATUS_RUNNING:
160
        status = constants.JOB_STATUS_RUNNING
161
      elif op.status == constants.OP_STATUS_ERROR:
162
        status = constants.JOB_STATUS_ERROR
163
        # The whole job fails if one opcode failed
164
        break
165
      elif op.status == constants.OP_STATUS_CANCELED:
166
        status = constants.OP_STATUS_CANCELED
167
        break
168

    
169
    if all_success:
170
      status = constants.JOB_STATUS_SUCCESS
171

    
172
    return status
173

    
174

    
175
class _JobQueueWorker(workerpool.BaseWorker):
176
  def RunTask(self, job):
177
    """Job executor.
178

179
    This functions processes a job.
180

181
    """
182
    logging.debug("Worker %s processing job %s",
183
                  self.worker_id, job.id)
184
    proc = mcpu.Processor(self.pool.queue.context)
185
    queue = job.queue
186
    try:
187
      try:
188
        count = len(job.ops)
189
        for idx, op in enumerate(job.ops):
190
          try:
191
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
192

    
193
            queue.acquire()
194
            try:
195
              job.run_op_index = idx
196
              op.status = constants.OP_STATUS_RUNNING
197
              op.result = None
198
              queue.UpdateJobUnlocked(job)
199

    
200
              input_opcode = op.input
201
            finally:
202
              queue.release()
203

    
204
            result = proc.ExecOpCode(input_opcode, op.Log)
205

    
206
            queue.acquire()
207
            try:
208
              op.status = constants.OP_STATUS_SUCCESS
209
              op.result = result
210
              queue.UpdateJobUnlocked(job)
211
            finally:
212
              queue.release()
213

    
214
            logging.debug("Op %s/%s: Successfully finished %s",
215
                          idx + 1, count, op)
216
          except Exception, err:
217
            queue.acquire()
218
            try:
219
              try:
220
                op.status = constants.OP_STATUS_ERROR
221
                op.result = str(err)
222
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
223
              finally:
224
                queue.UpdateJobUnlocked(job)
225
            finally:
226
              queue.release()
227
            raise
228

    
229
      except errors.GenericError, err:
230
        logging.exception("Ganeti exception")
231
      except:
232
        logging.exception("Unhandled exception")
233
    finally:
234
      queue.acquire()
235
      try:
236
        job_id = job.id
237
        status = job.CalcStatus()
238
      finally:
239
        queue.release()
240
      logging.debug("Worker %s finished job %s, status = %s",
241
                    self.worker_id, job_id, status)
242

    
243

    
244
class _JobQueueWorkerPool(workerpool.WorkerPool):
245
  def __init__(self, queue):
246
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
247
                                              _JobQueueWorker)
248
    self.queue = queue
249

    
250

    
251
class JobQueue(object):
252
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
253

    
254
  def _RequireOpenQueue(fn):
255
    """Decorator for "public" functions.
256

257
    This function should be used for all "public" functions. That is, functions
258
    usually called from other classes.
259

260
    Important: Use this decorator only after utils.LockedMethod!
261

262
    Example:
263
      @utils.LockedMethod
264
      @_RequireOpenQueue
265
      def Example(self):
266
        pass
267

268
    """
269
    def wrapper(self, *args, **kwargs):
270
      assert self.lock_fd, "Queue should be open"
271
      return fn(self, *args, **kwargs)
272
    return wrapper
273

    
274
  def __init__(self, context):
275
    self.context = context
276
    self._memcache = {}
277
    self._my_hostname = utils.HostInfo().name
278

    
279
    # Locking
280
    self._lock = threading.Lock()
281
    self.acquire = self._lock.acquire
282
    self.release = self._lock.release
283

    
284
    # Make sure our directories exists
285
    for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
286
      try:
287
        os.mkdir(path, 0700)
288
      except OSError, err:
289
        if err.errno not in (errno.EEXIST, ):
290
          raise
291

    
292
    # Get queue lock
293
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
294
    try:
295
      utils.LockFile(self.lock_fd)
296
    except:
297
      self.lock_fd.close()
298
      raise
299

    
300
    # Read version
301
    try:
302
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
303
    except IOError, err:
304
      if err.errno not in (errno.ENOENT, ):
305
        raise
306

    
307
      # Setup a new queue
308
      self._InitQueueUnlocked()
309

    
310
      # Try to open again
311
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
312

    
313
    try:
314
      # Try to read version
315
      version = int(version_fd.read(128))
316

    
317
      # Verify version
318
      if version != constants.JOB_QUEUE_VERSION:
319
        raise errors.JobQueueError("Found version %s, expected %s",
320
                                   version, constants.JOB_QUEUE_VERSION)
321
    finally:
322
      version_fd.close()
323

    
324
    self._last_serial = self._ReadSerial()
325
    if self._last_serial is None:
326
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
327
                                      " file")
328

    
329
    # Setup worker pool
330
    self._wpool = _JobQueueWorkerPool(self)
331

    
332
    # We need to lock here because WorkerPool.AddTask() may start a job while
333
    # we're still doing our work.
334
    self.acquire()
335
    try:
336
      for job in self._GetJobsUnlocked(None):
337
        status = job.CalcStatus()
338

    
339
        if status in (constants.JOB_STATUS_QUEUED, ):
340
          self._wpool.AddTask(job)
341

    
342
        elif status in (constants.JOB_STATUS_RUNNING, ):
343
          logging.warning("Unfinished job %s found: %s", job.id, job)
344
          try:
345
            for op in job.ops:
346
              op.status = constants.OP_STATUS_ERROR
347
              op.result = "Unclean master daemon shutdown"
348
          finally:
349
            self.UpdateJobUnlocked(job)
350
    finally:
351
      self.release()
352

    
353
  @staticmethod
354
  def _ReadSerial():
355
    """Try to read the job serial file.
356

357
    @rtype: None or int
358
    @return: If the serial can be read, then it is returned. Otherwise None
359
             is returned.
360

361
    """
362
    try:
363
      serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
364
      try:
365
        # Read last serial
366
        serial = int(serial_fd.read(1024).strip())
367
      finally:
368
        serial_fd.close()
369
    except (ValueError, EnvironmentError):
370
      serial = None
371

    
372
    return serial
373

    
374
  def _InitQueueUnlocked(self):
375
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
376
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
377
    if self._ReadSerial() is None:
378
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
379
                      data="%s\n" % 0)
380

    
381
  def _FormatJobID(self, job_id):
382
    if not isinstance(job_id, (int, long)):
383
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
384
    if job_id < 0:
385
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
386

    
387
    return str(job_id)
388

    
389
  def _NewSerialUnlocked(self, nodes):
390
    """Generates a new job identifier.
391

392
    Job identifiers are unique during the lifetime of a cluster.
393

394
    Returns: A string representing the job identifier.
395

396
    """
397
    # New number
398
    serial = self._last_serial + 1
399

    
400
    # Write to file
401
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
402
                    data="%s\n" % serial)
403

    
404
    # Keep it only if we were able to write the file
405
    self._last_serial = serial
406

    
407
    # Distribute the serial to the other nodes
408
    try:
409
      nodes.remove(self._my_hostname)
410
    except ValueError:
411
      pass
412

    
413
    result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
414
    for node in nodes:
415
      if not result[node]:
416
        logging.error("copy of job queue file to node %s failed", node)
417

    
418
    return self._FormatJobID(serial)
419

    
420
  @staticmethod
421
  def _GetJobPath(job_id):
422
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
423

    
424
  @staticmethod
425
  def _GetArchivedJobPath(job_id):
426
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
427

    
428
  @classmethod
429
  def _ExtractJobID(cls, name):
430
    m = cls._RE_JOB_FILE.match(name)
431
    if m:
432
      return m.group(1)
433
    else:
434
      return None
435

    
436
  def _GetJobIDsUnlocked(self, archived=False):
437
    """Return all known job IDs.
438

439
    If the parameter archived is True, archived jobs IDs will be
440
    included. Currently this argument is unused.
441

442
    The method only looks at disk because it's a requirement that all
443
    jobs are present on disk (so in the _memcache we don't have any
444
    extra IDs).
445

446
    """
447
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
448
    jlist.sort()
449
    return jlist
450

    
451
  def _ListJobFiles(self):
452
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
453
            if self._RE_JOB_FILE.match(name)]
454

    
455
  def _LoadJobUnlocked(self, job_id):
456
    if job_id in self._memcache:
457
      logging.debug("Found job %s in memcache", job_id)
458
      return self._memcache[job_id]
459

    
460
    filepath = self._GetJobPath(job_id)
461
    logging.debug("Loading job from %s", filepath)
462
    try:
463
      fd = open(filepath, "r")
464
    except IOError, err:
465
      if err.errno in (errno.ENOENT, ):
466
        return None
467
      raise
468
    try:
469
      data = serializer.LoadJson(fd.read())
470
    finally:
471
      fd.close()
472

    
473
    job = _QueuedJob.Restore(self, data)
474
    self._memcache[job_id] = job
475
    logging.debug("Added job %s to the cache", job_id)
476
    return job
477

    
478
  def _GetJobsUnlocked(self, job_ids):
479
    if not job_ids:
480
      job_ids = self._GetJobIDsUnlocked()
481

    
482
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
483

    
484
  @utils.LockedMethod
485
  @_RequireOpenQueue
486
  def SubmitJob(self, ops, nodes):
487
    """Create and store a new job.
488

489
    This enters the job into our job queue and also puts it on the new
490
    queue, in order for it to be picked up by the queue processors.
491

492
    @type ops: list
493
    @param ops: The list of OpCodes that will become the new job.
494
    @type nodes: list
495
    @param nodes: The list of nodes to which the new job serial will be
496
                  distributed.
497

498
    """
499
    # Get job identifier
500
    job_id = self._NewSerialUnlocked(nodes)
501
    job = _QueuedJob(self, job_id, ops)
502

    
503
    # Write to disk
504
    self.UpdateJobUnlocked(job)
505

    
506
    logging.debug("Added new job %s to the cache", job_id)
507
    self._memcache[job_id] = job
508

    
509
    # Add to worker pool
510
    self._wpool.AddTask(job)
511

    
512
    return job.id
513

    
514
  @_RequireOpenQueue
515
  def UpdateJobUnlocked(self, job):
516
    filename = self._GetJobPath(job.id)
517
    logging.debug("Writing job %s to %s", job.id, filename)
518
    utils.WriteFile(filename,
519
                    data=serializer.DumpJson(job.Serialize(), indent=False))
520
    self._CleanCacheUnlocked([job.id])
521

    
522
  def _CleanCacheUnlocked(self, exclude):
523
    """Clean the memory cache.
524

525
    The exceptions argument contains job IDs that should not be
526
    cleaned.
527

528
    """
529
    assert isinstance(exclude, list)
530

    
531
    for job in self._memcache.values():
532
      if job.id in exclude:
533
        continue
534
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
535
                                  constants.JOB_STATUS_RUNNING):
536
        logging.debug("Cleaning job %s from the cache", job.id)
537
        try:
538
          del self._memcache[job.id]
539
        except KeyError:
540
          pass
541

    
542
  @utils.LockedMethod
543
  @_RequireOpenQueue
544
  def CancelJob(self, job_id):
545
    """Cancels a job.
546

547
    @type job_id: string
548
    @param job_id: Job ID of job to be cancelled.
549

550
    """
551
    logging.debug("Cancelling job %s", job_id)
552

    
553
    job = self._LoadJobUnlocked(job_id)
554
    if not job:
555
      logging.debug("Job %s not found", job_id)
556
      return
557

    
558
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
559
      logging.debug("Job %s is no longer in the queue", job.id)
560
      return
561

    
562
    try:
563
      for op in job.ops:
564
        op.status = constants.OP_STATUS_ERROR
565
        op.result = "Job cancelled by request"
566
    finally:
567
      self.UpdateJobUnlocked(job)
568

    
569
  @utils.LockedMethod
570
  @_RequireOpenQueue
571
  def ArchiveJob(self, job_id):
572
    """Archives a job.
573

574
    @type job_id: string
575
    @param job_id: Job ID of job to be archived.
576

577
    """
578
    logging.debug("Archiving job %s", job_id)
579

    
580
    job = self._LoadJobUnlocked(job_id)
581
    if not job:
582
      logging.debug("Job %s not found", job_id)
583
      return
584

    
585
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
586
                                constants.JOB_STATUS_SUCCESS,
587
                                constants.JOB_STATUS_ERROR):
588
      logging.debug("Job %s is not yet done", job.id)
589
      return
590

    
591
    try:
592
      old = self._GetJobPath(job.id)
593
      new = self._GetArchivedJobPath(job.id)
594

    
595
      os.rename(old, new)
596

    
597
      logging.debug("Successfully archived job %s", job.id)
598
    finally:
599
      # Cleaning the cache because we don't know what os.rename actually did
600
      # and to be on the safe side.
601
      self._CleanCacheUnlocked([])
602

    
603
  def _GetJobInfoUnlocked(self, job, fields):
604
    row = []
605
    for fname in fields:
606
      if fname == "id":
607
        row.append(job.id)
608
      elif fname == "status":
609
        row.append(job.CalcStatus())
610
      elif fname == "ops":
611
        row.append([op.input.__getstate__() for op in job.ops])
612
      elif fname == "opresult":
613
        row.append([op.result for op in job.ops])
614
      elif fname == "opstatus":
615
        row.append([op.status for op in job.ops])
616
      elif fname == "ticker":
617
        ji = job.run_op_index
618
        if ji < 0:
619
          lmsg = None
620
        else:
621
          lmsg = job.ops[ji].RetrieveLog(-1)
622
          # message might be empty here
623
          if lmsg:
624
            lmsg = lmsg[0]
625
          else:
626
            lmsg = None
627
        row.append(lmsg)
628
      else:
629
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
630
    return row
631

    
632
  @utils.LockedMethod
633
  @_RequireOpenQueue
634
  def QueryJobs(self, job_ids, fields):
635
    """Returns a list of jobs in queue.
636

637
    Args:
638
    - job_ids: Sequence of job identifiers or None for all
639
    - fields: Names of fields to return
640

641
    """
642
    jobs = []
643

    
644
    for job in self._GetJobsUnlocked(job_ids):
645
      if job is None:
646
        jobs.append(None)
647
      else:
648
        jobs.append(self._GetJobInfoUnlocked(job, fields))
649

    
650
    return jobs
651

    
652
  @utils.LockedMethod
653
  @_RequireOpenQueue
654
  def Shutdown(self):
655
    """Stops the job queue.
656

657
    """
658
    self._wpool.TerminateWorkers()
659

    
660
    self.lock_fd.close()
661
    self.lock_fd = None