Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ c609f802

History | View | Annotate | Download (17.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29
import time
30

    
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import workerpool
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import mcpu
37
from ganeti import utils
38
from ganeti import rpc
39

    
40

    
41
JOBQUEUE_THREADS = 5
42

    
43

    
44
class _QueuedOpCode(object):
45
  """Encasulates an opcode object.
46

47
  Access is synchronized by the '_lock' attribute.
48

49
  The 'log' attribute holds the execution log and consists of tuples
50
  of the form (timestamp, level, message).
51

52
  """
53
  def __init__(self, op):
54
    self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
55

    
56
  def __Setup(self, input_, status, result, log):
57
    self._lock = threading.Lock()
58
    self.input = input_
59
    self.status = status
60
    self.result = result
61
    self.log = log
62

    
63
  @classmethod
64
  def Restore(cls, state):
65
    obj = object.__new__(cls)
66
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
67
                state["status"], state["result"], state["log"])
68
    return obj
69

    
70
  @utils.LockedMethod
71
  def Serialize(self):
72
    return {
73
      "input": self.input.__getstate__(),
74
      "status": self.status,
75
      "result": self.result,
76
      "log": self.log,
77
      }
78

    
79
  @utils.LockedMethod
80
  def GetInput(self):
81
    """Returns the original opcode.
82

83
    """
84
    return self.input
85

    
86
  @utils.LockedMethod
87
  def SetStatus(self, status, result):
88
    """Update the opcode status and result.
89

90
    """
91
    self.status = status
92
    self.result = result
93

    
94
  @utils.LockedMethod
95
  def GetStatus(self):
96
    """Get the opcode status.
97

98
    """
99
    return self.status
100

    
101
  @utils.LockedMethod
102
  def GetResult(self):
103
    """Get the opcode result.
104

105
    """
106
    return self.result
107

    
108
  @utils.LockedMethod
109
  def Log(self, *args):
110
    """Append a log entry.
111

112
    """
113
    assert len(args) < 2
114

    
115
    if len(args) == 1:
116
      log_type = constants.ELOG_MESSAGE
117
      log_msg = args[0]
118
    else:
119
      log_type, log_msg = args
120
    self.log.append((time.time(), log_type, log_msg))
121

    
122
  @utils.LockedMethod
123
  def RetrieveLog(self, start_at=0):
124
    """Retrieve (a part of) the execution log.
125

126
    """
127
    return self.log[start_at:]
128

    
129

    
130
class _QueuedJob(object):
131
  """In-memory job representation.
132

133
  This is what we use to track the user-submitted jobs.
134

135
  """
136
  def __init__(self, storage, job_id, ops):
137
    if not ops:
138
      # TODO
139
      raise Exception("No opcodes")
140

    
141
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
142

    
143
  def __Setup(self, storage, job_id, ops, run_op_index):
144
    self._lock = threading.Lock()
145
    self.storage = storage
146
    self.id = job_id
147
    self._ops = ops
148
    self.run_op_index = run_op_index
149

    
150
  @classmethod
151
  def Restore(cls, storage, state):
152
    obj = object.__new__(cls)
153
    op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
154
    obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
155
    return obj
156

    
157
  def Serialize(self):
158
    return {
159
      "id": self.id,
160
      "ops": [op.Serialize() for op in self._ops],
161
      "run_op_index": self.run_op_index,
162
      }
163

    
164
  def SetUnclean(self, msg):
165
    try:
166
      for op in self._ops:
167
        op.SetStatus(constants.OP_STATUS_ERROR, msg)
168
    finally:
169
      self.storage.UpdateJob(self)
170

    
171
  def GetStatus(self):
172
    status = constants.JOB_STATUS_QUEUED
173

    
174
    all_success = True
175
    for op in self._ops:
176
      op_status = op.GetStatus()
177
      if op_status == constants.OP_STATUS_SUCCESS:
178
        continue
179

    
180
      all_success = False
181

    
182
      if op_status == constants.OP_STATUS_QUEUED:
183
        pass
184
      elif op_status == constants.OP_STATUS_RUNNING:
185
        status = constants.JOB_STATUS_RUNNING
186
      elif op_status == constants.OP_STATUS_ERROR:
187
        status = constants.JOB_STATUS_ERROR
188
        # The whole job fails if one opcode failed
189
        break
190

    
191
    if all_success:
192
      status = constants.JOB_STATUS_SUCCESS
193

    
194
    return status
195

    
196
  @utils.LockedMethod
197
  def GetRunOpIndex(self):
198
    return self.run_op_index
199

    
200
  def Run(self, proc):
201
    """Job executor.
202

203
    This functions processes a this job in the context of given processor
204
    instance.
205

206
    Args:
207
    - proc: Ganeti Processor to run the job with
208

209
    """
210
    try:
211
      count = len(self._ops)
212
      for idx, op in enumerate(self._ops):
213
        try:
214
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
215

    
216
          self._lock.acquire()
217
          try:
218
            self.run_op_index = idx
219
          finally:
220
            self._lock.release()
221

    
222
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
223
          self.storage.UpdateJob(self)
224

    
225
          result = proc.ExecOpCode(op.input, op.Log)
226

    
227
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
228
          self.storage.UpdateJob(self)
229
          logging.debug("Op %s/%s: Successfully finished %s",
230
                        idx + 1, count, op)
231
        except Exception, err:
232
          try:
233
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
234
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
235
          finally:
236
            self.storage.UpdateJob(self)
237
          raise
238

    
239
    except errors.GenericError, err:
240
      logging.error("ganeti exception %s", exc_info=err)
241
    except Exception, err:
242
      logging.error("unhandled exception %s", exc_info=err)
243
    except:
244
      logging.error("unhandled unknown exception %s", exc_info=err)
245

    
246

    
247
class _JobQueueWorker(workerpool.BaseWorker):
248
  def RunTask(self, job):
249
    logging.debug("Worker %s processing job %s",
250
                  self.worker_id, job.id)
251
    # TODO: feedback function
252
    proc = mcpu.Processor(self.pool.context)
253
    try:
254
      job.Run(proc)
255
    finally:
256
      logging.debug("Worker %s finished job %s, status = %s",
257
                    self.worker_id, job.id, job.GetStatus())
258

    
259

    
260
class _JobQueueWorkerPool(workerpool.WorkerPool):
261
  def __init__(self, context):
262
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
263
                                              _JobQueueWorker)
264
    self.context = context
265

    
266

    
267
class JobStorageBase(object):
268
  def __init__(self, id_prefix):
269
    self.id_prefix = id_prefix
270

    
271
    if id_prefix:
272
      prefix_pattern = re.escape("%s-" % id_prefix)
273
    else:
274
      prefix_pattern = ""
275

    
276
    # Apart from the prefix, all job IDs are numeric
277
    self._re_job_id = re.compile(r"^%s\d+$" % prefix_pattern)
278

    
279
  def OwnsJobId(self, job_id):
280
    return self._re_job_id.match(job_id)
281

    
282
  def FormatJobID(self, job_id):
283
    if not isinstance(job_id, (int, long)):
284
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
285
    if job_id < 0:
286
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
287

    
288
    if self.id_prefix:
289
      prefix = "%s-" % self.id_prefix
290
    else:
291
      prefix = ""
292

    
293
    return "%s%010d" % (prefix, job_id)
294

    
295
  def _ShouldJobBeArchivedUnlocked(self, job):
296
    if job.GetStatus() not in (constants.JOB_STATUS_CANCELED,
297
                               constants.JOB_STATUS_SUCCESS,
298
                               constants.JOB_STATUS_ERROR):
299
      logging.debug("Job %s is not yet done", job.id)
300
      return False
301
    return True
302

    
303

    
304
class DiskJobStorage(JobStorageBase):
305
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
306

    
307
  def __init__(self, id_prefix):
308
    JobStorageBase.__init__(self, id_prefix)
309

    
310
    self._lock = threading.Lock()
311
    self._memcache = {}
312
    self._my_hostname = utils.HostInfo().name
313

    
314
    # Make sure our directories exists
315
    for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
316
      try:
317
        os.mkdir(path, 0700)
318
      except OSError, err:
319
        if err.errno not in (errno.EEXIST, ):
320
          raise
321

    
322
    # Get queue lock
323
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
324
    try:
325
      utils.LockFile(self.lock_fd)
326
    except:
327
      self.lock_fd.close()
328
      raise
329

    
330
    # Read version
331
    try:
332
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
333
    except IOError, err:
334
      if err.errno not in (errno.ENOENT, ):
335
        raise
336

    
337
      # Setup a new queue
338
      self._InitQueueUnlocked()
339

    
340
      # Try to open again
341
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
342

    
343
    try:
344
      # Try to read version
345
      version = int(version_fd.read(128))
346

    
347
      # Verify version
348
      if version != constants.JOB_QUEUE_VERSION:
349
        raise errors.JobQueueError("Found version %s, expected %s",
350
                                   version, constants.JOB_QUEUE_VERSION)
351
    finally:
352
      version_fd.close()
353

    
354
    self._last_serial = self._ReadSerial()
355
    if self._last_serial is None:
356
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
357
                                      " file")
358

    
359
  @staticmethod
360
  def _ReadSerial():
361
    """Try to read the job serial file.
362

363
    @rtype: None or int
364
    @return: If the serial can be read, then it is returned. Otherwise None
365
             is returned.
366

367
    """
368
    try:
369
      serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
370
      try:
371
        # Read last serial
372
        serial = int(serial_fd.read(1024).strip())
373
      finally:
374
        serial_fd.close()
375
    except (ValueError, EnvironmentError):
376
      serial = None
377

    
378
    return serial
379

    
380
  def Close(self):
381
    assert self.lock_fd, "Queue should be open"
382

    
383
    self.lock_fd.close()
384
    self.lock_fd = None
385

    
386
  def _InitQueueUnlocked(self):
387
    assert self.lock_fd, "Queue should be open"
388

    
389
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
390
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
391
    if self._ReadSerial() is None:
392
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
393
                      data="%s\n" % 0)
394

    
395
  def _NewSerialUnlocked(self, nodes):
396
    """Generates a new job identifier.
397

398
    Job identifiers are unique during the lifetime of a cluster.
399

400
    Returns: A string representing the job identifier.
401

402
    """
403
    assert self.lock_fd, "Queue should be open"
404

    
405
    # New number
406
    serial = self._last_serial + 1
407

    
408
    # Write to file
409
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
410
                    data="%s\n" % serial)
411

    
412
    # Keep it only if we were able to write the file
413
    self._last_serial = serial
414

    
415
    # Distribute the serial to the other nodes
416
    try:
417
      nodes.remove(self._my_hostname)
418
    except ValueError:
419
      pass
420

    
421
    result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
422
    for node in nodes:
423
      if not result[node]:
424
        logging.error("copy of job queue file to node %s failed", node)
425

    
426
    return self.FormatJobID(serial)
427

    
428
  def _GetJobPath(self, job_id):
429
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
430

    
431
  def _GetArchivedJobPath(self, job_id):
432
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
433

    
434
  def _GetJobIDsUnlocked(self, archived=False):
435
    """Return all known job IDs.
436

437
    If the parameter archived is True, archived jobs IDs will be
438
    included. Currently this argument is unused.
439

440
    The method only looks at disk because it's a requirement that all
441
    jobs are present on disk (so in the _memcache we don't have any
442
    extra IDs).
443

444
    """
445
    jfiles = self._ListJobFiles()
446
    jlist = [m.group(1) for m in
447
             [self._RE_JOB_FILE.match(name) for name in jfiles]]
448
    jlist.sort()
449
    return jlist
450

    
451
  def _ListJobFiles(self):
452
    assert self.lock_fd, "Queue should be open"
453

    
454
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
455
            if self._RE_JOB_FILE.match(name)]
456

    
457
  def _LoadJobUnlocked(self, job_id):
458
    assert self.lock_fd, "Queue should be open"
459

    
460
    if job_id in self._memcache:
461
      logging.debug("Found job %s in memcache", job_id)
462
      return self._memcache[job_id]
463

    
464
    filepath = self._GetJobPath(job_id)
465
    logging.debug("Loading job from %s", filepath)
466
    try:
467
      fd = open(filepath, "r")
468
    except IOError, err:
469
      if err.errno in (errno.ENOENT, ):
470
        return None
471
      raise
472
    try:
473
      data = serializer.LoadJson(fd.read())
474
    finally:
475
      fd.close()
476

    
477
    job = _QueuedJob.Restore(self, data)
478
    self._memcache[job_id] = job
479
    logging.debug("Added job %s to the cache", job_id)
480
    return job
481

    
482
  def _GetJobsUnlocked(self, job_ids):
483
    if not job_ids:
484
      job_ids = self._GetJobIDsUnlocked()
485

    
486
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
487

    
488
  @utils.LockedMethod
489
  def GetJobs(self, job_ids):
490
    return self._GetJobsUnlocked(job_ids)
491

    
492
  @utils.LockedMethod
493
  def AddJob(self, ops, nodes):
494
    """Create and store on disk a new job.
495

496
    @type ops: list
497
    @param ops: The list of OpCodes that will become the new job.
498
    @type nodes: list
499
    @param nodes: The list of nodes to which the new job serial will be
500
                  distributed.
501

502
    """
503
    assert self.lock_fd, "Queue should be open"
504

    
505
    # Get job identifier
506
    job_id = self._NewSerialUnlocked(nodes)
507
    job = _QueuedJob(self, job_id, ops)
508

    
509
    # Write to disk
510
    self._UpdateJobUnlocked(job)
511

    
512
    logging.debug("Added new job %s to the cache", job_id)
513
    self._memcache[job_id] = job
514

    
515
    return job
516

    
517
  def _UpdateJobUnlocked(self, job):
518
    assert self.lock_fd, "Queue should be open"
519

    
520
    filename = self._GetJobPath(job.id)
521
    logging.debug("Writing job %s to %s", job.id, filename)
522
    utils.WriteFile(filename,
523
                    data=serializer.DumpJson(job.Serialize(), indent=False))
524
    self._CleanCacheUnlocked([job.id])
525

    
526
  def _CleanCacheUnlocked(self, exclude):
527
    """Clean the memory cache.
528

529
    The exceptions argument contains job IDs that should not be
530
    cleaned.
531

532
    """
533
    assert isinstance(exclude, list)
534
    for job in self._memcache.values():
535
      if job.id in exclude:
536
        continue
537
      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
538
                                 constants.JOB_STATUS_RUNNING):
539
        logging.debug("Cleaning job %s from the cache", job.id)
540
        try:
541
          del self._memcache[job.id]
542
        except KeyError:
543
          pass
544

    
545
  @utils.LockedMethod
546
  def UpdateJob(self, job):
547
    return self._UpdateJobUnlocked(job)
548

    
549
  @utils.LockedMethod
550
  def ArchiveJob(self, job_id):
551
    """Archives a job.
552

553
    @type job_id: string
554
    @param job_id: Job ID of job to be archived.
555

556
    """
557
    logging.debug("Archiving job %s", job_id)
558

    
559
    job = self._LoadJobUnlocked(job_id)
560
    if not job:
561
      logging.debug("Job %s not found", job_id)
562
      return
563

    
564
    if not self._ShouldJobBeArchivedUnlocked(job):
565
      return
566

    
567
    try:
568
      old = self._GetJobPath(job.id)
569
      new = self._GetArchivedJobPath(job.id)
570

    
571
      os.rename(old, new)
572

    
573
      logging.debug("Successfully archived job %s", job.id)
574
    finally:
575
      # Cleaning the cache because we don't know what os.rename actually did
576
      # and to be on the safe side.
577
      self._CleanCacheUnlocked([])
578

    
579

    
580
class JobQueue:
581
  """The job queue.
582

583
  """
584
  def __init__(self, context):
585
    self._lock = threading.Lock()
586
    self._jobs = DiskJobStorage("")
587
    self._wpool = _JobQueueWorkerPool(context)
588

    
589
    for job in self._jobs.GetJobs(None):
590
      status = job.GetStatus()
591
      if status in (constants.JOB_STATUS_QUEUED, ):
592
        self._wpool.AddTask(job)
593

    
594
      elif status in (constants.JOB_STATUS_RUNNING, ):
595
        logging.warning("Unfinished job %s found: %s", job.id, job)
596
        job.SetUnclean("Unclean master daemon shutdown")
597

    
598
  @utils.LockedMethod
599
  def SubmitJob(self, ops, nodes):
600
    """Add a new job to the queue.
601

602
    This enters the job into our job queue and also puts it on the new
603
    queue, in order for it to be picked up by the queue processors.
604

605
    @type ops: list
606
    @param ops: the sequence of opcodes that will become the new job
607
    @type nodes: list
608
    @param nodes: the list of nodes to which the queue should be
609
                  distributed
610

611
    """
612
    job = self._jobs.AddJob(ops, nodes)
613

    
614
    # Add to worker pool
615
    self._wpool.AddTask(job)
616

    
617
    return job.id
618

    
619
  def ArchiveJob(self, job_id):
620
    self._jobs.ArchiveJob(job_id)
621

    
622
  def CancelJob(self, job_id):
623
    raise NotImplementedError()
624

    
625
  def _GetJobInfo(self, job, fields):
626
    row = []
627
    for fname in fields:
628
      if fname == "id":
629
        row.append(job.id)
630
      elif fname == "status":
631
        row.append(job.GetStatus())
632
      elif fname == "ops":
633
        row.append([op.GetInput().__getstate__() for op in job._ops])
634
      elif fname == "opresult":
635
        row.append([op.GetResult() for op in job._ops])
636
      elif fname == "opstatus":
637
        row.append([op.GetStatus() for op in job._ops])
638
      elif fname == "ticker":
639
        ji = job.GetRunOpIndex()
640
        if ji < 0:
641
          lmsg = None
642
        else:
643
          lmsg = job._ops[ji].RetrieveLog(-1)
644
          # message might be empty here
645
          if lmsg:
646
            lmsg = lmsg[0]
647
          else:
648
            lmsg = None
649
        row.append(lmsg)
650
      else:
651
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
652
    return row
653

    
654
  def QueryJobs(self, job_ids, fields):
655
    """Returns a list of jobs in queue.
656

657
    Args:
658
    - job_ids: Sequence of job identifiers or None for all
659
    - fields: Names of fields to return
660

661
    """
662
    self._lock.acquire()
663
    try:
664
      jobs = []
665

    
666
      for job in self._jobs.GetJobs(job_ids):
667
        if job is None:
668
          jobs.append(None)
669
        else:
670
          jobs.append(self._GetJobInfo(job, fields))
671

    
672
      return jobs
673
    finally:
674
      self._lock.release()
675

    
676
  @utils.LockedMethod
677
  def Shutdown(self):
678
    """Stops the job queue.
679

680
    """
681
    self._wpool.TerminateWorkers()
682
    self._jobs.Close()