Statistics
| Branch: | Tag: | Revision:

root / commissioning / api / specificator.py @ 92cb0768

History | View | Annotate | Download (17.8 kB)

1
# -*- coding: utf8 -*- 
2
from random import random, choice, randint
3
from math import log
4
from inspect import isclass
5

    
6
def shorts(s):
7
    if not isinstance(s, unicode):
8
        s = str(s)
9

    
10
    if len(s) <= 64:
11
        return s
12

    
13
    return s[:61] + '...'
14
        
15

    
16
class CanonifyException(Exception):
17
    pass
18

    
19
class SpecifyException(Exception):
20
    pass
21

    
22

    
23
class Canonical(object):
24

    
25
    random_choice = None
26

    
27
    def __init__(self, *args, **kw):
28
        self.args = args
29
        self.kw = kw
30
        self.name = kw.pop('classname', self.__class__.__name__)
31
        random_choice = kw.pop('random', None)
32
        if random_choice is not None:
33
            self.random_choice = random_choice
34
        opts = {}
35
        for k, v in kw.items():
36
            if not isinstance(v, Canonical):
37
                if isclass(v) and issubclass(v, Canonical):
38
                    m = ("argument '%s': value '%s' is a Canonical _class_. "
39
                         "Perhaps you meant to specify a Canonical _instance_"
40
                         % (k, v))
41
                    raise SpecifyException(m)
42
                opts[k] = v
43
                del kw[k]
44

    
45
        self.opts = opts
46
        self.init()
47

    
48
        if 'default' in opts:
49
            item = opts['default']
50
            if item is None:
51
                opts['null'] = 1
52
            else:
53
                opts['default'] = self.check(item)
54

    
55
    def init(self):
56
        return
57

    
58
    def __call__(self, item):
59
        opts = self.opts
60
        if item is None and 'default' in opts:
61
            item = opts['default']
62

    
63
        can_be_null = opts.get('null', False)
64
        if item is None and can_be_null:
65
            return None
66

    
67
        return self.check(item)
68

    
69
    def check(self, item):
70
        return item
71

    
72
    def create(self):
73
        return None
74

    
75
    def random(self, **kw):
76
        random_choice = self.random_choice
77
        if random_choice is None:
78
            return None
79

    
80
        if callable(random_choice):
81
            return random_choice(kw)
82

    
83
        if isinstance(random_choice, str):
84
            return getattr(self, random_choice)(kw)
85

    
86
        return choice(random_choice)
87

    
88
    def tostring(self, depth=0, showopts=0, multiline=0):
89
        depth += 1
90
        if not multiline:
91
            argdepth = ''
92
            owndepth = ''
93
            joinchar = ','
94
            padchar = ''
95
        else:
96
            argdepth = '    ' * depth
97
            owndepth = '    ' * (depth - 1)
98
            joinchar = ',\n'
99
            padchar = '\n'
100

    
101
        args = [a.tostring( depth=depth,
102
                            showopts=showopts,
103
                            multiline=multiline) for a in self.args]
104
        args += [("%s=%s" %
105
                    (k, v.tostring( depth=depth,
106
                                    showopts=showopts,
107
                                    multiline=multiline)))
108
                                    
109
                                    for k, v in self.kw.items()]
110
        if showopts:
111
            args += [("%s=%s" % (k, str(v))) for k, v in self.opts.items()]
112

    
113
        if len(args) == 0:
114
            string = "%s(%s)" % (self.name, ','.join(args))
115
        else:
116
            string = "%s(%s" % (self.name, padchar)
117
            for arg in args:
118
                string += argdepth + arg + joinchar
119
            string = string[:-1] + padchar
120
            string += owndepth + ")"
121

    
122
        return string
123

    
124
    __str__ = tostring
125

    
126
    def __repr__(self):
127
        return self.tostring(multiline=0, showopts=1)
128

    
129
    def check(item):
130
        canonified = item
131
        return canonified
132

    
133

    
134
class Null(Canonical):
135

    
136
    def check(self, item):
137
        return None
138

    
139
Nothing = Null()
140

    
141

    
142
class Integer(Canonical):
143

    
144
    def check(self, item):
145
        try:
146
            num = long(item)
147
        except ValueError, e:
148
            try:
149
                num = long(item, 16)
150
            except Exception:
151
                m = "%s: cannot convert '%s' to long" % (self, shorts(item))
152
                raise CanonifyException(m)
153
        except TypeError, e:
154
            m = "%s: cannot convert '%s' to long" % (self, shorts(item))
155
            raise CanonifyException(m)
156

    
157
        optget = self.opts.get
158
        minimum = optget('minimum', None)
159
        maximum = optget('maximum', None)
160

    
161
        if minimum is not None and num < minimum:
162
            m = "%s: %d < minimum=%d" % (self, num, minimum)
163
            raise CanonifyException(m)
164

    
165
        if maximum is not None and num > maximum:
166
            m = "%s: %d > maximum=%d" % (self, num, maximum)
167
            raise CanonifyException(m)
168

    
169
        return num
170

    
171
    def random_integer(self, kw):
172
        optget = self.opts.get
173
        kwget = kw.get
174
        minimum = kwget('minimum', optget('minimum', -4294967296L))
175
        maximum = kwget('maximum', optget('maximum', 4294967295L))
176
        r = random()
177
        if r < 0.1:
178
            return minimum
179
        if r < 0.2:
180
            return maximum
181
        if minimum <= 0 and maximum >= 0 and r < 0.3:
182
            return 0L
183
        return long(minimum + r * (maximum - minimum))
184

    
185
    random_choice = random_integer
186

    
187

    
188

    
189
Serial = Integer(
190
            classname   =   'Serial',
191
            null        =   True,
192
)
193

    
194

    
195
class Text(Canonical):
196

    
197
    re = None
198
    matcher = None
199
    choices = None
200

    
201
    def init(self):
202
        opts = self.opts
203
        if 'regex' in opts:
204
            pat = opts['regex']
205
            re = self.re
206
            if re is None:
207
                import re
208
                self.re = re
209

    
210
            self.matcher = re.compile(pat, re.UNICODE)
211
            self.pat = pat
212

    
213
        if 'choices' in opts:
214
            opts['choices'] = dict((unicode(x), unicode(x))
215
                                    for x in opts['choices'])
216

    
217
    def check(self, item):
218
        if not isinstance(item, unicode):
219
            # require non-unicode items to be utf8
220
            item = str(item)
221
            try:
222
                item = item.decode('utf8')
223
            except UnicodeDecodeError, e:
224
                item = item.decode('latin1')
225
                m = "%s: non-unicode '%s' is not utf8" % (self, shorts(item))
226
                raise CanonifyException(m)
227

    
228
        opts = self.opts
229
        if 'choices' in opts:
230
            choices = opts['choices']
231
            try:
232
                unknown = item not in choices
233
            except TypeError, e:
234
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
235
                raise CanonifyException(m, e)
236

    
237
            if unknown:
238
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
239
                raise CanonifyException(m)
240

    
241
            return choices[item]
242

    
243
        optget = opts.get
244
        itemlen = len(item)
245
        maxlen = optget('maxlen', None)
246
        if maxlen is not None and itemlen > maxlen:
247
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
248
            raise CanonifyException(m)
249

    
250
        minlen = optget('minlen', None)
251
        if minlen is not None and itemlen < minlen:
252
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
253
            raise CanonifyException(m)
254
            
255
        matcher = self.matcher
256
        if matcher is not None:
257
            match = matcher.match(item)
258
            if  (       match is None
259
                    or  (match.start(), match.end()) != (0, itemlen)    ):
260

    
261
                    m = ("%s: '%s' does not match '%s'"
262
                            % (self, shorts(item), self.pat))
263
                    raise CanonifyException(m)
264

    
265
        return item
266

    
267
    default_alphabet = '0123456789αβγδεζ'.decode('utf8')
268

    
269
    def random_string(self, kw):
270
        opts = self.opts
271
        if 'regex' in opts:
272
            m = 'Unfortunately, random for regex strings not supported'
273
            raise ValueError(m)
274

    
275
        optget = opts.get
276
        kwget = kw.get
277
        minlen = kwget('minlen', optget('minlen', 0))
278
        maxlen = kwget('maxlen', optget('maxlen', 32))
279
        alphabet = kwget('alphabet', self.default_alphabet)
280
        z = maxlen - minlen
281
        if z < 1:
282
            z = 1
283

    
284
        g = log(z, 2)
285
        r = random() * g
286
        z = minlen + int(2**r)
287

    
288
        s = u''
289
        for _ in xrange(z):
290
            s += choice(alphabet)
291

    
292
        return s
293

    
294
    random_choice = random_string
295

    
296

    
297
class Bytes(Canonical):
298

    
299
    re = None
300
    matcher = None
301
    choices = None
302

    
303
    def init(self):
304
        opts = self.opts
305
        if 'regex' in opts:
306
            pat = opts['regex']
307
            re = self.re
308
            if re is None:
309
                import re
310
                self.re = re
311

    
312
            self.matcher = re.compile(pat)
313
            self.pat = pat
314

    
315
        if 'choices' in opts:
316
            opts['choices'] = dict((str(x), str(x))
317
                                    for x in opts['choices'])
318

    
319
    def check(self, item):
320
        if isinstance(item, unicode):
321
            # convert unicode to utf8
322
            item = item.encode('utf8')
323

    
324
        opts = self.opts
325
        if 'choices' in opts:
326
            choices = opts['choices']
327
            try:
328
                unknown = item not in choices
329
            except TypeError, e:
330
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
331
                raise CanonifyException(m, e)
332

    
333
            if unknown:
334
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
335
                raise CanonifyException(m)
336

    
337
            return choices[item]
338

    
339
        optget = opts.get
340
        itemlen = len(item)
341
        maxlen = optget('maxlen', None)
342
        if maxlen is not None and itemlen > maxlen:
343
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
344
            raise CanonifyException(m)
345

    
346
        minlen = optget('minlen', None)
347
        if minlen is not None and itemlen < minlen:
348
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
349
            raise CanonifyException(m)
350
            
351
        matcher = self.matcher
352
        if matcher is not None:
353
            match = matcher.match(item)
354
            if  (       match is None
355
                    or  (match.start(), match.end()) != (0, itemlen)    ):
356

    
357
                    m = ("%s: '%s' does not match '%s'"
358
                            % (self, shorts(item), self.pat))
359
                    raise CanonifyException(m)
360

    
361
        return item
362

    
363
    default_alphabet = '0123456789abcdef'
364

    
365
    def random_bytes(self, kw):
366
        opts = self.opts
367
        if 'regex' in opts:
368
            m = 'Unfortunately, random for regex strings not supported'
369
            raise ValueError(m)
370

    
371
        optget = opts.get
372
        kwget = kw.get
373
        minlen = kwget('minlen', optget('minlen', 0))
374
        maxlen = kwget('maxlen', optget('maxlen', 32))
375
        alphabet = kwget('alphabet', self.default_alphabet)
376
        z = maxlen - minlen
377
        if z < 1:
378
            z = 1
379

    
380
        g = log(z, 2)
381
        r = random() * g
382
        z = minlen + int(2**r)
383

    
384
        s = u''
385
        for _ in xrange(z):
386
            s += choice(alphabet)
387

    
388
        return s
389

    
390
    random_choice = random_bytes
391

    
392

    
393
class ListOf(Canonical):
394

    
395
    def init(self):
396
        args = self.args
397
        kw = self.kw
398

    
399
        if not (args or kw):
400
            raise SpecifyException("ListOf requires one or more arguments")
401

    
402
        if args and kw:
403
            m = ("ListOf requires either positional "
404
                 "or keyword arguments, but not both")
405
            raise SpecifyException(m)
406

    
407
        if args:
408
            if len(args) > 1:
409
                self.canonical = Tuple(*args)
410
            else:
411
                self.canonical = args[0]
412
        else:
413
            self.canonical = Args(**kw)
414

    
415
    def check(self, item):
416
        if item is None:
417
            item = ()
418

    
419
        try:
420
            items = iter(item)
421
        except TypeError, e:
422
            m = "%s: %s is not iterable" % (self, shorts(item))
423
            raise CanonifyException(m)
424

    
425
        canonical = self.canonical
426
        canonified = []
427
        append = canonified.append
428

    
429
        for item in items:
430
            item = canonical(item)
431
            append(item)
432

    
433
        if not canonified and self.opts.get('nonempty', False):
434
            m = "%s: must be nonempty" % (self,)
435
            raise CanonifyException(m)
436

    
437
        return canonified
438

    
439
    def random_listof(self, kw):
440
        z = randint(1, 4)
441
        get_random = self.canonical.random
442

    
443
        return [get_random() for _ in xrange(z)]
444

    
445
    random_choice = random_listof
446

    
447

    
448
class Args(Canonical):
449

    
450
    def init(self):
451
        if self.args:
452
            raise ValueError("Args accepts only keyword arguments")
453

    
454
    def check(self, item):
455
        try:
456
            item = dict(item)
457
        except TypeError, e:
458
            m = "%s: %s is not dict-able" % (self, shorts(item))
459
            raise CanonifyException(m)
460

    
461
        canonified = {}
462

    
463
        try:
464
            for n, c in self.kw.items():
465
                t = item[n] if n in item else None
466
                canonified[n] = c(t)
467
        except KeyError:
468
            m = ("%s: Argument '%s' not found in '%s'" 
469
                        % (self, shorts(n), shorts(item)))
470
            raise CanonifyException(m)
471

    
472
        return canonified
473

    
474
    def random_args(self, kw):
475
        args = {}
476
        for n, c in self.kw.items():
477
            args[n] = c.random()
478
        return args
479

    
480
    random_choice = random_args
481

    
482

    
483
class Tuple(Canonical):
484

    
485
    def check(self, item):
486
        try:
487
            items = list(item)
488
        except TypeError, e:
489
            m = "%s: %s is not iterable" % (self, shorts(item))
490
            raise CanonifyException(m)
491

    
492
        canonicals = self.args
493
        zi = len(items)
494
        zc = len(canonicals)
495

    
496
        if zi != zc:
497
            m = "%s: expecting %d elements, not %d (%s)" % (self, zc, zi, str(items))
498
            raise CanonifyException(m)
499

    
500
        g = (canonical(element) for canonical, element in zip(self.args, item))
501

    
502
        return tuple(g)
503

    
504
    def __add__(self, other):
505
        oargs = other.args if isinstance(other, Tuple) else (other,)
506
        args = self.args + oargs
507
        return self.__class__(*args)
508

    
509
    def random_tuple(self, kw):
510
        return tuple(c.random() for c in self.args)
511

    
512
    random_choice = random_tuple
513

    
514

    
515
class Dict(Canonical):
516

    
517
    def check(self, item):
518

    
519
        try:
520
            item = dict(item)
521
        except TypeError:
522
            m = "%s: '%s' is not dict-able" % (self, shorts(item))
523
            raise CanonifyException(m)
524

    
525
        canonified = {}
526
        canonical = self.kw
527

    
528
        for n, c in canonical.items():
529
            if n not in item:
530
                m = "%s: key '%s' not found" % (self, shorts(n))
531
                raise CanonifyException(m)
532
            canonified[n] = c(item[n])   
533

    
534
        strict = self.opts.get('strict', False)
535
        if strict and len(item) != len(canonical):
536
            for k in sorted(item.keys()):
537
                if k not in canonical:
538
                    break
539

    
540
            m = "%s: unexpected key '%s' (strict mode)" % (self, shorts(k))
541
            raise CanonifyException(m)
542

    
543
        return canonified
544

    
545
    def random_dict(self, kw):
546
        item = {}
547
        for n, c in self.canonical.items():
548
            item[n] = c.random()
549

    
550
        return item
551

    
552
    random_choice = random_dict
553

    
554

    
555
class Canonifier(object):
556
    def __init__(self, name, input_canonicals, output_canonicals):
557
        self.name = name
558
        self.input_canonicals = dict(input_canonicals)
559
        self.output_canonicals = dict(output_canonicals)
560

    
561
    def call_names(self):
562
        return self.input_canonicals.keys()
563

    
564
    def call_attrs(self):
565
        for call_name, canonical in self.input_canonicals.iteritems():
566
            yield call_name, canonical.tostring(showopts=1, multiline=1)
567

    
568
    def input_canonical(self, name):
569
        input_canonicals = self.input_canonicals
570
        if name not in input_canonicals:
571
            m = "%s: Invalid input call '%s'" % (self.name, name)
572
            raise CanonifyException(m)
573

    
574
        return input_canonicals[name]
575

    
576
    def canonify_input(self, name, the_input):
577
        return self.input_canonical(name)(the_input)
578

    
579
    def output_canonical(self, name):
580
        output_canonicals = self.output_canonicals
581
        if name not in output_canonicals:
582
            m = "%s: Output canonical '%s' does not exist" % (self.name, name)
583
            raise CanonifyException(m)
584

    
585
        return output_canonicals[name]
586

    
587
    def canonify_output(self, name, the_output):
588
        return self.output_canonical(name)(the_output)
589

    
590

    
591
class Specificator(object):
592

    
593
    def __new__(cls):
594
        if cls is Specificator:
595
            m = "Specificator classes must be subclassed"
596
            raise SpecifyException(m)
597

    
598
        import inspect
599

    
600
        canonical_inputs = {}
601
        canonical_outputs = {}
602

    
603
        for name in dir(cls):
604
            f = getattr(cls, name)
605
            if not inspect.ismethod(f) or f.__name__.startswith('_'):
606
                continue
607

    
608
            argspec = inspect.getargspec(f)
609
            defaults = argspec.defaults
610
            args = argspec.args
611
            if args and args[0] == 'self':
612
                args = args[1:]
613

    
614
            if not defaults:
615
                defaults = ()
616

    
617
            arglen = len(args)
618
            deflen = len(defaults)
619

    
620
            if arglen != deflen:
621
                a = (f.__name__, args[:arglen-deflen])
622
                m = "Unspecified arguments in '%s': %s" % a
623
                raise SpecifyException(m)
624

    
625
            args = dict(zip(args, defaults))
626
            for a, c in args.items():
627
                if not isinstance(c, Canonical):
628
                    m = ("argument '%s=%s' is not an instance of 'Canonical'"
629
                         % (a, repr(c)))
630
                    raise SpecifyException(m)
631

    
632
            canonical = Null() if len(args) == 0 else Args(**args)
633
            canonical_inputs[name] = canonical
634

    
635
            self = object.__new__(cls)
636
            canonical = f(self)
637
            if not isinstance(canonical, Canonical):
638
                m = ("method '%s' does not return a Canonical, but a(n) %s "
639
                                                    % (name, type(canonical)))
640
                raise SpecifyException(m)
641
            canonical_outputs[name] = canonical
642

    
643
        return Canonifier(cls.__name__, canonical_inputs, canonical_outputs)
644

    
645
    def __call__(self):
646
        return self
647