summaryrefslogtreecommitdiffstats
path: root/contrib/wpa/wpa_supplicant/eapol_test.py
blob: 80e7dfcf531dc391bb6fa376e5f83daeb7044a54 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/env python2
#
# eapol_test controller
# Copyright (c) 2015, Jouni Malinen <j@w1.fi>
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.

import argparse
import logging
import os
import Queue
import sys
import threading

logger = logging.getLogger()
dir = os.path.dirname(os.path.realpath(sys.modules[__name__].__file__))
sys.path.append(os.path.join(dir, '..', 'wpaspy'))
import wpaspy
wpas_ctrl = '/tmp/eapol_test'

class eapol_test:
    def __init__(self, ifname):
        self.ifname = ifname
        self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
        if "PONG" not in self.ctrl.request("PING"):
            raise Exception("Failed to connect to eapol_test (%s)" % ifname)
        self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
        self.mon.attach()

    def add_network(self):
        id = self.request("ADD_NETWORK")
        if "FAIL" in id:
            raise Exception("ADD_NETWORK failed")
        return int(id)

    def remove_network(self, id):
        id = self.request("REMOVE_NETWORK " + str(id))
        if "FAIL" in id:
            raise Exception("REMOVE_NETWORK failed")
        return None

    def set_network(self, id, field, value):
        res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value)
        if "FAIL" in res:
            raise Exception("SET_NETWORK failed")
        return None

    def set_network_quoted(self, id, field, value):
        res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"')
        if "FAIL" in res:
            raise Exception("SET_NETWORK failed")
        return None

    def request(self, cmd, timeout=10):
        return self.ctrl.request(cmd, timeout=timeout)

    def wait_event(self, events, timeout=10):
        start = os.times()[4]
        while True:
            while self.mon.pending():
                ev = self.mon.recv()
                logger.debug(self.ifname + ": " + ev)
                for event in events:
                    if event in ev:
                        return ev
            now = os.times()[4]
            remaining = start + timeout - now
            if remaining <= 0:
                break
            if not self.mon.pending(timeout=remaining):
                break
        return None

def run(ifname, count, no_fast_reauth, res):
    et = eapol_test(ifname)

    et.request("AP_SCAN 0")
    if no_fast_reauth:
        et.request("SET fast_reauth 0")
    else:
        et.request("SET fast_reauth 1")
    id = et.add_network()
    et.set_network(id, "key_mgmt", "IEEE8021X")
    et.set_network(id, "eapol_flags", "0")
    et.set_network(id, "eap", "TLS")
    et.set_network_quoted(id, "identity", "user")
    et.set_network_quoted(id, "ca_cert", 'ca.pem')
    et.set_network_quoted(id, "client_cert", 'client.pem')
    et.set_network_quoted(id, "private_key", 'client.key')
    et.set_network_quoted(id, "private_key_passwd", 'whatever')
    et.set_network(id, "disabled", "0")

    fail = False
    for i in range(count):
        et.request("REASSOCIATE")
        ev = et.wait_event(["CTRL-EVENT-CONNECTED", "CTRL-EVENT-EAP-FAILURE"])
        if ev is None or "CTRL-EVENT-CONNECTED" not in ev:
            fail = True
            break

    et.remove_network(id)

    if fail:
        res.put("FAIL (%d OK)" % i)
    else:
        res.put("PASS %d" % (i + 1))

def main():
    parser = argparse.ArgumentParser(description='eapol_test controller')
    parser.add_argument('--ctrl', help='control interface directory')
    parser.add_argument('--num', help='number of processes')
    parser.add_argument('--iter', help='number of iterations')
    parser.add_argument('--no-fast-reauth', action='store_true',
                        dest='no_fast_reauth',
                        help='disable TLS session resumption')
    args = parser.parse_args()

    num = int(args.num)
    iter = int(args.iter)
    if args.ctrl:
        global wpas_ctrl
        wpas_ctrl = args.ctrl

    t = {}
    res = {}
    for i in range(num):
        res[i] = Queue.Queue()
        t[i] = threading.Thread(target=run, args=(str(i), iter,
                                                  args.no_fast_reauth, res[i]))
    for i in range(num):
        t[i].start()
    for i in range(num):
        t[i].join()
        try:
            results = res[i].get(False)
        except:
            results = "N/A"
        print "%d: %s" % (i, results)

if __name__ == "__main__":
    main()
OpenPOWER on IntegriCloud