Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 21cc1fbd

History | View | Annotate | Download (15.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29
import time
30

    
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import workerpool
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import mcpu
37
from ganeti import utils
38
from ganeti import rpc
39

    
40

    
41
JOBQUEUE_THREADS = 5
42

    
43

    
44
class _QueuedOpCode(object):
45
  """Encasulates an opcode object.
46

47
  Access is synchronized by the '_lock' attribute.
48

49
  The 'log' attribute holds the execution log and consists of tuples
50
  of the form (timestamp, level, message).
51

52
  """
53
  def __init__(self, op):
54
    self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
55

    
56
  def __Setup(self, input_, status, result, log):
57
    self._lock = threading.Lock()
58
    self.input = input_
59
    self.status = status
60
    self.result = result
61
    self.log = log
62

    
63
  @classmethod
64
  def Restore(cls, state):
65
    obj = object.__new__(cls)
66
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
67
                state["status"], state["result"], state["log"])
68
    return obj
69

    
70
  @utils.LockedMethod
71
  def Serialize(self):
72
    return {
73
      "input": self.input.__getstate__(),
74
      "status": self.status,
75
      "result": self.result,
76
      "log": self.log,
77
      }
78

    
79
  @utils.LockedMethod
80
  def GetInput(self):
81
    """Returns the original opcode.
82

83
    """
84
    return self.input
85

    
86
  @utils.LockedMethod
87
  def SetStatus(self, status, result):
88
    """Update the opcode status and result.
89

90
    """
91
    self.status = status
92
    self.result = result
93

    
94
  @utils.LockedMethod
95
  def GetStatus(self):
96
    """Get the opcode status.
97

98
    """
99
    return self.status
100

    
101
  @utils.LockedMethod
102
  def GetResult(self):
103
    """Get the opcode result.
104

105
    """
106
    return self.result
107

    
108
  @utils.LockedMethod
109
  def Log(self, *args):
110
    """Append a log entry.
111

112
    """
113
    assert len(args) < 2
114

    
115
    if len(args) == 1:
116
      log_type = constants.ELOG_MESSAGE
117
      log_msg = args[0]
118
    else:
119
      log_type, log_msg = args
120
    self.log.append((time.time(), log_type, log_msg))
121

    
122
  @utils.LockedMethod
123
  def RetrieveLog(self, start_at=0):
124
    """Retrieve (a part of) the execution log.
125

126
    """
127
    return self.log[start_at:]
128

    
129

    
130
class _QueuedJob(object):
131
  """In-memory job representation.
132

133
  This is what we use to track the user-submitted jobs.
134

135
  """
136
  def __init__(self, storage, job_id, ops):
137
    if not ops:
138
      # TODO
139
      raise Exception("No opcodes")
140

    
141
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
142

    
143
  def __Setup(self, storage, job_id, ops, run_op_index):
144
    self._lock = threading.Lock()
145
    self.storage = storage
146
    self.id = job_id
147
    self._ops = ops
148
    self.run_op_index = run_op_index
149

    
150
  @classmethod
151
  def Restore(cls, storage, state):
152
    obj = object.__new__(cls)
153
    op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
154
    obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
155
    return obj
156

    
157
  def Serialize(self):
158
    return {
159
      "id": self.id,
160
      "ops": [op.Serialize() for op in self._ops],
161
      "run_op_index": self.run_op_index,
162
      }
163

    
164
  def SetUnclean(self, msg):
165
    try:
166
      for op in self._ops:
167
        op.SetStatus(constants.OP_STATUS_ERROR, msg)
168
    finally:
169
      self.storage.UpdateJob(self)
170

    
171
  def GetStatus(self):
172
    status = constants.JOB_STATUS_QUEUED
173

    
174
    all_success = True
175
    for op in self._ops:
176
      op_status = op.GetStatus()
177
      if op_status == constants.OP_STATUS_SUCCESS:
178
        continue
179

    
180
      all_success = False
181

    
182
      if op_status == constants.OP_STATUS_QUEUED:
183
        pass
184
      elif op_status == constants.OP_STATUS_RUNNING:
185
        status = constants.JOB_STATUS_RUNNING
186
      elif op_status == constants.OP_STATUS_ERROR:
187
        status = constants.JOB_STATUS_ERROR
188
        # The whole job fails if one opcode failed
189
        break
190

    
191
    if all_success:
192
      status = constants.JOB_STATUS_SUCCESS
193

    
194
    return status
195

    
196
  @utils.LockedMethod
197
  def GetRunOpIndex(self):
198
    return self.run_op_index
199

    
200
  def Run(self, proc):
201
    """Job executor.
202

203
    This functions processes a this job in the context of given processor
204
    instance.
205

206
    Args:
207
    - proc: Ganeti Processor to run the job with
208

209
    """
210
    try:
211
      count = len(self._ops)
212
      for idx, op in enumerate(self._ops):
213
        try:
214
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
215

    
216
          self._lock.acquire()
217
          try:
218
            self.run_op_index = idx
219
          finally:
220
            self._lock.release()
221

    
222
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
223
          self.storage.UpdateJob(self)
224

    
225
          result = proc.ExecOpCode(op.input, op.Log)
226

    
227
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
228
          self.storage.UpdateJob(self)
229
          logging.debug("Op %s/%s: Successfully finished %s",
230
                        idx + 1, count, op)
231
        except Exception, err:
232
          try:
233
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
234
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
235
          finally:
236
            self.storage.UpdateJob(self)
237
          raise
238

    
239
    except errors.GenericError, err:
240
      logging.error("ganeti exception %s", exc_info=err)
241
    except Exception, err:
242
      logging.error("unhandled exception %s", exc_info=err)
243
    except:
244
      logging.error("unhandled unknown exception %s", exc_info=err)
245

    
246

    
247
class _JobQueueWorker(workerpool.BaseWorker):
248
  def RunTask(self, job):
249
    logging.debug("Worker %s processing job %s",
250
                  self.worker_id, job.id)
251
    # TODO: feedback function
252
    proc = mcpu.Processor(self.pool.context)
253
    try:
254
      job.Run(proc)
255
    finally:
256
      logging.debug("Worker %s finished job %s, status = %s",
257
                    self.worker_id, job.id, job.GetStatus())
258

    
259

    
260
class _JobQueueWorkerPool(workerpool.WorkerPool):
261
  def __init__(self, context):
262
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
263
                                              _JobQueueWorker)
264
    self.context = context
265

    
266

    
267
class DiskJobStorage(object):
268
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
269

    
270
  def __init__(self):
271
    self._lock = threading.Lock()
272
    self._memcache = {}
273
    self._my_hostname = utils.HostInfo().name
274

    
275
    # Make sure our directory exists
276
    try:
277
      os.mkdir(constants.QUEUE_DIR, 0700)
278
    except OSError, err:
279
      if err.errno not in (errno.EEXIST, ):
280
        raise
281

    
282
    # Get queue lock
283
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
284
    try:
285
      utils.LockFile(self.lock_fd)
286
    except:
287
      self.lock_fd.close()
288
      raise
289

    
290
    # Read version
291
    try:
292
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
293
    except IOError, err:
294
      if err.errno not in (errno.ENOENT, ):
295
        raise
296

    
297
      # Setup a new queue
298
      self._InitQueueUnlocked()
299

    
300
      # Try to open again
301
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
302

    
303
    try:
304
      # Try to read version
305
      version = int(version_fd.read(128))
306

    
307
      # Verify version
308
      if version != constants.JOB_QUEUE_VERSION:
309
        raise errors.JobQueueError("Found version %s, expected %s",
310
                                   version, constants.JOB_QUEUE_VERSION)
311
    finally:
312
      version_fd.close()
313

    
314
    self._last_serial = self._ReadSerial()
315
    if self._last_serial is None:
316
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
317
                                      " file")
318

    
319
  @staticmethod
320
  def _ReadSerial():
321
    """Try to read the job serial file.
322

323
    @rtype: None or int
324
    @return: If the serial can be read, then it is returned. Otherwise None
325
             is returned.
326

327
    """
328
    try:
329
      serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
330
      try:
331
        # Read last serial
332
        serial = int(serial_fd.read(1024).strip())
333
      finally:
334
        serial_fd.close()
335
    except (ValueError, EnvironmentError):
336
      serial = None
337

    
338
    return serial
339

    
340
  def Close(self):
341
    assert self.lock_fd, "Queue should be open"
342

    
343
    self.lock_fd.close()
344
    self.lock_fd = None
345

    
346
  def _InitQueueUnlocked(self):
347
    assert self.lock_fd, "Queue should be open"
348

    
349
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
350
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
351
    if self._ReadSerial() is None:
352
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
353
                      data="%s\n" % 0)
354

    
355
  def _NewSerialUnlocked(self, nodes):
356
    """Generates a new job identifier.
357

358
    Job identifiers are unique during the lifetime of a cluster.
359

360
    Returns: A string representing the job identifier.
361

362
    """
363
    assert self.lock_fd, "Queue should be open"
364

    
365
    # New number
366
    serial = self._last_serial + 1
367

    
368
    # Write to file
369
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
370
                    data="%s\n" % serial)
371

    
372
    # Keep it only if we were able to write the file
373
    self._last_serial = serial
374

    
375
    # Distribute the serial to the other nodes
376
    try:
377
      nodes.remove(self._my_hostname)
378
    except ValueError:
379
      pass
380

    
381
    result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
382
    for node in nodes:
383
      if not result[node]:
384
        logging.error("copy of job queue file to node %s failed", node)
385

    
386
    return str(serial)
387

    
388
  def _GetJobPath(self, job_id):
389
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
390

    
391
  def _GetJobIDsUnlocked(self, archived=False):
392
    """Return all known job IDs.
393

394
    If the parameter archived is True, archived jobs IDs will be
395
    included. Currently this argument is unused.
396

397
    The method only looks at disk because it's a requirement that all
398
    jobs are present on disk (so in the _memcache we don't have any
399
    extra IDs).
400

401
    """
402
    jfiles = self._ListJobFiles()
403
    jlist = [int(m.group(1)) for m in
404
             [self._RE_JOB_FILE.match(name) for name in jfiles]]
405
    jlist.sort()
406
    return jlist
407

    
408
  def _ListJobFiles(self):
409
    assert self.lock_fd, "Queue should be open"
410

    
411
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
412
            if self._RE_JOB_FILE.match(name)]
413

    
414
  def _LoadJobUnlocked(self, job_id):
415
    assert self.lock_fd, "Queue should be open"
416

    
417
    if job_id in self._memcache:
418
      logging.debug("Found job %s in memcache", job_id)
419
      return self._memcache[job_id]
420

    
421
    filepath = self._GetJobPath(job_id)
422
    logging.debug("Loading job from %s", filepath)
423
    try:
424
      fd = open(filepath, "r")
425
    except IOError, err:
426
      if err.errno in (errno.ENOENT, ):
427
        return None
428
      raise
429
    try:
430
      data = serializer.LoadJson(fd.read())
431
    finally:
432
      fd.close()
433

    
434
    job = _QueuedJob.Restore(self, data)
435
    self._memcache[job_id] = job
436
    logging.debug("Added job %s to the cache", job_id)
437
    return job
438

    
439
  def _GetJobsUnlocked(self, job_ids):
440
    if not job_ids:
441
      job_ids = self._GetJobIDsUnlocked()
442

    
443
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
444

    
445
  @utils.LockedMethod
446
  def GetJobs(self, job_ids):
447
    return self._GetJobsUnlocked(job_ids)
448

    
449
  @utils.LockedMethod
450
  def AddJob(self, ops, nodes):
451
    """Create and store on disk a new job.
452

453
    @type ops: list
454
    @param ops: The list of OpCodes that will become the new job.
455
    @type nodes: list
456
    @param nodes: The list of nodes to which the new job serial will be
457
                  distributed.
458

459
    """
460
    assert self.lock_fd, "Queue should be open"
461

    
462
    # Get job identifier
463
    job_id = self._NewSerialUnlocked(nodes)
464
    job = _QueuedJob(self, job_id, ops)
465

    
466
    # Write to disk
467
    self._UpdateJobUnlocked(job)
468

    
469
    logging.debug("Added new job %s to the cache", job_id)
470
    self._memcache[job_id] = job
471

    
472
    return job
473

    
474
  def _UpdateJobUnlocked(self, job):
475
    assert self.lock_fd, "Queue should be open"
476

    
477
    filename = self._GetJobPath(job.id)
478
    logging.debug("Writing job %s to %s", job.id, filename)
479
    utils.WriteFile(filename,
480
                    data=serializer.DumpJson(job.Serialize(), indent=False))
481
    self._CleanCacheUnlocked([job.id])
482

    
483
  def _CleanCacheUnlocked(self, exclude):
484
    """Clean the memory cache.
485

486
    The exceptions argument contains job IDs that should not be
487
    cleaned.
488

489
    """
490
    assert isinstance(exclude, list)
491
    for job in self._memcache.values():
492
      if job.id in exclude:
493
        continue
494
      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
495
                                 constants.JOB_STATUS_RUNNING):
496
        logging.debug("Cleaning job %s from the cache", job.id)
497
        try:
498
          del self._memcache[job.id]
499
        except KeyError:
500
          pass
501

    
502
  @utils.LockedMethod
503
  def UpdateJob(self, job):
504
    return self._UpdateJobUnlocked(job)
505

    
506
  def ArchiveJob(self, job_id):
507
    raise NotImplementedError()
508

    
509

    
510
class JobQueue:
511
  """The job queue.
512

513
   """
514
  def __init__(self, context):
515
    self._lock = threading.Lock()
516
    self._jobs = DiskJobStorage()
517
    self._wpool = _JobQueueWorkerPool(context)
518

    
519
    for job in self._jobs.GetJobs(None):
520
      status = job.GetStatus()
521
      if status in (constants.JOB_STATUS_QUEUED, ):
522
        self._wpool.AddTask(job)
523

    
524
      elif status in (constants.JOB_STATUS_RUNNING, ):
525
        logging.warning("Unfinished job %s found: %s", job.id, job)
526
        job.SetUnclean("Unclean master daemon shutdown")
527

    
528
  @utils.LockedMethod
529
  def SubmitJob(self, ops, nodes):
530
    """Add a new job to the queue.
531

532
    This enters the job into our job queue and also puts it on the new
533
    queue, in order for it to be picked up by the queue processors.
534

535
    @type ops: list
536
    @param ops: the sequence of opcodes that will become the new job
537
    @type nodes: list
538
    @param nodes: the list of nodes to which the queue should be
539
                  distributed
540

541
    """
542
    job = self._jobs.AddJob(ops, nodes)
543

    
544
    # Add to worker pool
545
    self._wpool.AddTask(job)
546

    
547
    return job.id
548

    
549
  def ArchiveJob(self, job_id):
550
    raise NotImplementedError()
551

    
552
  def CancelJob(self, job_id):
553
    raise NotImplementedError()
554

    
555
  def _GetJobInfo(self, job, fields):
556
    row = []
557
    for fname in fields:
558
      if fname == "id":
559
        row.append(job.id)
560
      elif fname == "status":
561
        row.append(job.GetStatus())
562
      elif fname == "ops":
563
        row.append([op.GetInput().__getstate__() for op in job._ops])
564
      elif fname == "opresult":
565
        row.append([op.GetResult() for op in job._ops])
566
      elif fname == "opstatus":
567
        row.append([op.GetStatus() for op in job._ops])
568
      elif fname == "ticker":
569
        ji = job.GetRunOpIndex()
570
        if ji < 0:
571
          lmsg = None
572
        else:
573
          lmsg = job._ops[ji].RetrieveLog(-1)
574
          # message might be empty here
575
          if lmsg:
576
            lmsg = lmsg[0]
577
          else:
578
            lmsg = None
579
        row.append(lmsg)
580
      else:
581
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
582
    return row
583

    
584
  def QueryJobs(self, job_ids, fields):
585
    """Returns a list of jobs in queue.
586

587
    Args:
588
    - job_ids: Sequence of job identifiers or None for all
589
    - fields: Names of fields to return
590

591
    """
592
    self._lock.acquire()
593
    try:
594
      jobs = []
595

    
596
      for job in self._jobs.GetJobs(job_ids):
597
        if job is None:
598
          jobs.append(None)
599
        else:
600
          jobs.append(self._GetJobInfo(job, fields))
601

    
602
      return jobs
603
    finally:
604
      self._lock.release()
605

    
606
  @utils.LockedMethod
607
  def Shutdown(self):
608
    """Stops the job queue.
609

610
    """
611
    self._wpool.TerminateWorkers()
612
    self._jobs.Close()