Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 0cb94105

History | View | Annotate | Download (16.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29
import time
30

    
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import workerpool
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import mcpu
37
from ganeti import utils
38
from ganeti import rpc
39

    
40

    
41
JOBQUEUE_THREADS = 5
42

    
43

    
44
class _QueuedOpCode(object):
45
  """Encasulates an opcode object.
46

47
  Access is synchronized by the '_lock' attribute.
48

49
  The 'log' attribute holds the execution log and consists of tuples
50
  of the form (timestamp, level, message).
51

52
  """
53
  def __init__(self, op):
54
    self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
55

    
56
  def __Setup(self, input_, status, result, log):
57
    self._lock = threading.Lock()
58
    self.input = input_
59
    self.status = status
60
    self.result = result
61
    self.log = log
62

    
63
  @classmethod
64
  def Restore(cls, state):
65
    obj = object.__new__(cls)
66
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
67
                state["status"], state["result"], state["log"])
68
    return obj
69

    
70
  @utils.LockedMethod
71
  def Serialize(self):
72
    return {
73
      "input": self.input.__getstate__(),
74
      "status": self.status,
75
      "result": self.result,
76
      "log": self.log,
77
      }
78

    
79
  @utils.LockedMethod
80
  def GetInput(self):
81
    """Returns the original opcode.
82

83
    """
84
    return self.input
85

    
86
  @utils.LockedMethod
87
  def SetStatus(self, status, result):
88
    """Update the opcode status and result.
89

90
    """
91
    self.status = status
92
    self.result = result
93

    
94
  @utils.LockedMethod
95
  def GetStatus(self):
96
    """Get the opcode status.
97

98
    """
99
    return self.status
100

    
101
  @utils.LockedMethod
102
  def GetResult(self):
103
    """Get the opcode result.
104

105
    """
106
    return self.result
107

    
108
  @utils.LockedMethod
109
  def Log(self, *args):
110
    """Append a log entry.
111

112
    """
113
    assert len(args) < 2
114

    
115
    if len(args) == 1:
116
      log_type = constants.ELOG_MESSAGE
117
      log_msg = args[0]
118
    else:
119
      log_type, log_msg = args
120
    self.log.append((time.time(), log_type, log_msg))
121

    
122
  @utils.LockedMethod
123
  def RetrieveLog(self, start_at=0):
124
    """Retrieve (a part of) the execution log.
125

126
    """
127
    return self.log[start_at:]
128

    
129

    
130
class _QueuedJob(object):
131
  """In-memory job representation.
132

133
  This is what we use to track the user-submitted jobs.
134

135
  """
136
  def __init__(self, storage, job_id, ops):
137
    if not ops:
138
      # TODO
139
      raise Exception("No opcodes")
140

    
141
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
142

    
143
  def __Setup(self, storage, job_id, ops, run_op_index):
144
    self._lock = threading.Lock()
145
    self.storage = storage
146
    self.id = job_id
147
    self._ops = ops
148
    self.run_op_index = run_op_index
149

    
150
  @classmethod
151
  def Restore(cls, storage, state):
152
    obj = object.__new__(cls)
153
    op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
154
    obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
155
    return obj
156

    
157
  def Serialize(self):
158
    return {
159
      "id": self.id,
160
      "ops": [op.Serialize() for op in self._ops],
161
      "run_op_index": self.run_op_index,
162
      }
163

    
164
  def SetUnclean(self, msg):
165
    try:
166
      for op in self._ops:
167
        op.SetStatus(constants.OP_STATUS_ERROR, msg)
168
    finally:
169
      self.storage.UpdateJob(self)
170

    
171
  def GetStatus(self):
172
    status = constants.JOB_STATUS_QUEUED
173

    
174
    all_success = True
175
    for op in self._ops:
176
      op_status = op.GetStatus()
177
      if op_status == constants.OP_STATUS_SUCCESS:
178
        continue
179

    
180
      all_success = False
181

    
182
      if op_status == constants.OP_STATUS_QUEUED:
183
        pass
184
      elif op_status == constants.OP_STATUS_RUNNING:
185
        status = constants.JOB_STATUS_RUNNING
186
      elif op_status == constants.OP_STATUS_ERROR:
187
        status = constants.JOB_STATUS_ERROR
188
        # The whole job fails if one opcode failed
189
        break
190

    
191
    if all_success:
192
      status = constants.JOB_STATUS_SUCCESS
193

    
194
    return status
195

    
196
  @utils.LockedMethod
197
  def GetRunOpIndex(self):
198
    return self.run_op_index
199

    
200
  def Run(self, proc):
201
    """Job executor.
202

203
    This functions processes a this job in the context of given processor
204
    instance.
205

206
    Args:
207
    - proc: Ganeti Processor to run the job with
208

209
    """
210
    try:
211
      count = len(self._ops)
212
      for idx, op in enumerate(self._ops):
213
        try:
214
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
215

    
216
          self._lock.acquire()
217
          try:
218
            self.run_op_index = idx
219
          finally:
220
            self._lock.release()
221

    
222
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
223
          self.storage.UpdateJob(self)
224

    
225
          result = proc.ExecOpCode(op.input, op.Log)
226

    
227
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
228
          self.storage.UpdateJob(self)
229
          logging.debug("Op %s/%s: Successfully finished %s",
230
                        idx + 1, count, op)
231
        except Exception, err:
232
          try:
233
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
234
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
235
          finally:
236
            self.storage.UpdateJob(self)
237
          raise
238

    
239
    except errors.GenericError, err:
240
      logging.error("ganeti exception %s", exc_info=err)
241
    except Exception, err:
242
      logging.error("unhandled exception %s", exc_info=err)
243
    except:
244
      logging.error("unhandled unknown exception %s", exc_info=err)
245

    
246

    
247
class _JobQueueWorker(workerpool.BaseWorker):
248
  def RunTask(self, job):
249
    logging.debug("Worker %s processing job %s",
250
                  self.worker_id, job.id)
251
    # TODO: feedback function
252
    proc = mcpu.Processor(self.pool.context)
253
    try:
254
      job.Run(proc)
255
    finally:
256
      logging.debug("Worker %s finished job %s, status = %s",
257
                    self.worker_id, job.id, job.GetStatus())
258

    
259

    
260
class _JobQueueWorkerPool(workerpool.WorkerPool):
261
  def __init__(self, context):
262
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
263
                                              _JobQueueWorker)
264
    self.context = context
265

    
266

    
267
class JobStorageBase(object):
268
  def __init__(self, id_prefix):
269
    self.id_prefix = id_prefix
270

    
271
    if id_prefix:
272
      prefix_pattern = re.escape("%s-" % id_prefix)
273
    else:
274
      prefix_pattern = ""
275

    
276
    # Apart from the prefix, all job IDs are numeric
277
    self._re_job_id = re.compile(r"^%s\d+$" % prefix_pattern)
278

    
279
  def OwnsJobId(self, job_id):
280
    return self._re_job_id.match(job_id)
281

    
282
  def FormatJobID(self, job_id):
283
    if not isinstance(job_id, (int, long)):
284
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
285
    if job_id < 0:
286
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
287

    
288
    if self.id_prefix:
289
      prefix = "%s-" % self.id_prefix
290
    else:
291
      prefix = ""
292

    
293
    return "%s%010d" % (prefix, job_id)
294

    
295

    
296
class DiskJobStorage(JobStorageBase):
297
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
298

    
299
  def __init__(self, id_prefix):
300
    JobStorageBase.__init__(self, id_prefix)
301

    
302
    self._lock = threading.Lock()
303
    self._memcache = {}
304
    self._my_hostname = utils.HostInfo().name
305

    
306
    # Make sure our directories exists
307
    for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
308
      try:
309
        os.mkdir(path, 0700)
310
      except OSError, err:
311
        if err.errno not in (errno.EEXIST, ):
312
          raise
313

    
314
    # Get queue lock
315
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
316
    try:
317
      utils.LockFile(self.lock_fd)
318
    except:
319
      self.lock_fd.close()
320
      raise
321

    
322
    # Read version
323
    try:
324
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
325
    except IOError, err:
326
      if err.errno not in (errno.ENOENT, ):
327
        raise
328

    
329
      # Setup a new queue
330
      self._InitQueueUnlocked()
331

    
332
      # Try to open again
333
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
334

    
335
    try:
336
      # Try to read version
337
      version = int(version_fd.read(128))
338

    
339
      # Verify version
340
      if version != constants.JOB_QUEUE_VERSION:
341
        raise errors.JobQueueError("Found version %s, expected %s",
342
                                   version, constants.JOB_QUEUE_VERSION)
343
    finally:
344
      version_fd.close()
345

    
346
    self._last_serial = self._ReadSerial()
347
    if self._last_serial is None:
348
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
349
                                      " file")
350

    
351
  @staticmethod
352
  def _ReadSerial():
353
    """Try to read the job serial file.
354

355
    @rtype: None or int
356
    @return: If the serial can be read, then it is returned. Otherwise None
357
             is returned.
358

359
    """
360
    try:
361
      serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
362
      try:
363
        # Read last serial
364
        serial = int(serial_fd.read(1024).strip())
365
      finally:
366
        serial_fd.close()
367
    except (ValueError, EnvironmentError):
368
      serial = None
369

    
370
    return serial
371

    
372
  def Close(self):
373
    assert self.lock_fd, "Queue should be open"
374

    
375
    self.lock_fd.close()
376
    self.lock_fd = None
377

    
378
  def _InitQueueUnlocked(self):
379
    assert self.lock_fd, "Queue should be open"
380

    
381
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
382
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
383
    if self._ReadSerial() is None:
384
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
385
                      data="%s\n" % 0)
386

    
387
  def _NewSerialUnlocked(self, nodes):
388
    """Generates a new job identifier.
389

390
    Job identifiers are unique during the lifetime of a cluster.
391

392
    Returns: A string representing the job identifier.
393

394
    """
395
    assert self.lock_fd, "Queue should be open"
396

    
397
    # New number
398
    serial = self._last_serial + 1
399

    
400
    # Write to file
401
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
402
                    data="%s\n" % serial)
403

    
404
    # Keep it only if we were able to write the file
405
    self._last_serial = serial
406

    
407
    # Distribute the serial to the other nodes
408
    try:
409
      nodes.remove(self._my_hostname)
410
    except ValueError:
411
      pass
412

    
413
    result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
414
    for node in nodes:
415
      if not result[node]:
416
        logging.error("copy of job queue file to node %s failed", node)
417

    
418
    return self.FormatJobID(serial)
419

    
420
  def _GetJobPath(self, job_id):
421
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
422

    
423
  def _GetArchivedJobPath(self, job_id):
424
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
425

    
426
  def _GetJobIDsUnlocked(self, archived=False):
427
    """Return all known job IDs.
428

429
    If the parameter archived is True, archived jobs IDs will be
430
    included. Currently this argument is unused.
431

432
    The method only looks at disk because it's a requirement that all
433
    jobs are present on disk (so in the _memcache we don't have any
434
    extra IDs).
435

436
    """
437
    jfiles = self._ListJobFiles()
438
    jlist = [m.group(1) for m in
439
             [self._RE_JOB_FILE.match(name) for name in jfiles]]
440
    jlist.sort()
441
    return jlist
442

    
443
  def _ListJobFiles(self):
444
    assert self.lock_fd, "Queue should be open"
445

    
446
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
447
            if self._RE_JOB_FILE.match(name)]
448

    
449
  def _LoadJobUnlocked(self, job_id):
450
    assert self.lock_fd, "Queue should be open"
451

    
452
    if job_id in self._memcache:
453
      logging.debug("Found job %s in memcache", job_id)
454
      return self._memcache[job_id]
455

    
456
    filepath = self._GetJobPath(job_id)
457
    logging.debug("Loading job from %s", filepath)
458
    try:
459
      fd = open(filepath, "r")
460
    except IOError, err:
461
      if err.errno in (errno.ENOENT, ):
462
        return None
463
      raise
464
    try:
465
      data = serializer.LoadJson(fd.read())
466
    finally:
467
      fd.close()
468

    
469
    job = _QueuedJob.Restore(self, data)
470
    self._memcache[job_id] = job
471
    logging.debug("Added job %s to the cache", job_id)
472
    return job
473

    
474
  def _GetJobsUnlocked(self, job_ids):
475
    if not job_ids:
476
      job_ids = self._GetJobIDsUnlocked()
477

    
478
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
479

    
480
  @utils.LockedMethod
481
  def GetJobs(self, job_ids):
482
    return self._GetJobsUnlocked(job_ids)
483

    
484
  @utils.LockedMethod
485
  def AddJob(self, ops, nodes):
486
    """Create and store on disk a new job.
487

488
    @type ops: list
489
    @param ops: The list of OpCodes that will become the new job.
490
    @type nodes: list
491
    @param nodes: The list of nodes to which the new job serial will be
492
                  distributed.
493

494
    """
495
    assert self.lock_fd, "Queue should be open"
496

    
497
    # Get job identifier
498
    job_id = self._NewSerialUnlocked(nodes)
499
    job = _QueuedJob(self, job_id, ops)
500

    
501
    # Write to disk
502
    self._UpdateJobUnlocked(job)
503

    
504
    logging.debug("Added new job %s to the cache", job_id)
505
    self._memcache[job_id] = job
506

    
507
    return job
508

    
509
  def _UpdateJobUnlocked(self, job):
510
    assert self.lock_fd, "Queue should be open"
511

    
512
    filename = self._GetJobPath(job.id)
513
    logging.debug("Writing job %s to %s", job.id, filename)
514
    utils.WriteFile(filename,
515
                    data=serializer.DumpJson(job.Serialize(), indent=False))
516
    self._CleanCacheUnlocked([job.id])
517

    
518
  def _CleanCacheUnlocked(self, exclude):
519
    """Clean the memory cache.
520

521
    The exceptions argument contains job IDs that should not be
522
    cleaned.
523

524
    """
525
    assert isinstance(exclude, list)
526
    for job in self._memcache.values():
527
      if job.id in exclude:
528
        continue
529
      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
530
                                 constants.JOB_STATUS_RUNNING):
531
        logging.debug("Cleaning job %s from the cache", job.id)
532
        try:
533
          del self._memcache[job.id]
534
        except KeyError:
535
          pass
536

    
537
  @utils.LockedMethod
538
  def UpdateJob(self, job):
539
    return self._UpdateJobUnlocked(job)
540

    
541
  def ArchiveJob(self, job_id):
542
    raise NotImplementedError()
543

    
544

    
545
class JobQueue:
546
  """The job queue.
547

548
  """
549
  def __init__(self, context):
550
    self._lock = threading.Lock()
551
    self._jobs = DiskJobStorage("")
552
    self._wpool = _JobQueueWorkerPool(context)
553

    
554
    for job in self._jobs.GetJobs(None):
555
      status = job.GetStatus()
556
      if status in (constants.JOB_STATUS_QUEUED, ):
557
        self._wpool.AddTask(job)
558

    
559
      elif status in (constants.JOB_STATUS_RUNNING, ):
560
        logging.warning("Unfinished job %s found: %s", job.id, job)
561
        job.SetUnclean("Unclean master daemon shutdown")
562

    
563
  @utils.LockedMethod
564
  def SubmitJob(self, ops, nodes):
565
    """Add a new job to the queue.
566

567
    This enters the job into our job queue and also puts it on the new
568
    queue, in order for it to be picked up by the queue processors.
569

570
    @type ops: list
571
    @param ops: the sequence of opcodes that will become the new job
572
    @type nodes: list
573
    @param nodes: the list of nodes to which the queue should be
574
                  distributed
575

576
    """
577
    job = self._jobs.AddJob(ops, nodes)
578

    
579
    # Add to worker pool
580
    self._wpool.AddTask(job)
581

    
582
    return job.id
583

    
584
  def ArchiveJob(self, job_id):
585
    raise NotImplementedError()
586

    
587
  def CancelJob(self, job_id):
588
    raise NotImplementedError()
589

    
590
  def _GetJobInfo(self, job, fields):
591
    row = []
592
    for fname in fields:
593
      if fname == "id":
594
        row.append(job.id)
595
      elif fname == "status":
596
        row.append(job.GetStatus())
597
      elif fname == "ops":
598
        row.append([op.GetInput().__getstate__() for op in job._ops])
599
      elif fname == "opresult":
600
        row.append([op.GetResult() for op in job._ops])
601
      elif fname == "opstatus":
602
        row.append([op.GetStatus() for op in job._ops])
603
      elif fname == "ticker":
604
        ji = job.GetRunOpIndex()
605
        if ji < 0:
606
          lmsg = None
607
        else:
608
          lmsg = job._ops[ji].RetrieveLog(-1)
609
          # message might be empty here
610
          if lmsg:
611
            lmsg = lmsg[0]
612
          else:
613
            lmsg = None
614
        row.append(lmsg)
615
      else:
616
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
617
    return row
618

    
619
  def QueryJobs(self, job_ids, fields):
620
    """Returns a list of jobs in queue.
621

622
    Args:
623
    - job_ids: Sequence of job identifiers or None for all
624
    - fields: Names of fields to return
625

626
    """
627
    self._lock.acquire()
628
    try:
629
      jobs = []
630

    
631
      for job in self._jobs.GetJobs(job_ids):
632
        if job is None:
633
          jobs.append(None)
634
        else:
635
          jobs.append(self._GetJobInfo(job, fields))
636

    
637
      return jobs
638
    finally:
639
      self._lock.release()
640

    
641
  @utils.LockedMethod
642
  def Shutdown(self):
643
    """Stops the job queue.
644

645
    """
646
    self._wpool.TerminateWorkers()
647
    self._jobs.Close()