]> git.friedersdorff.com Git - max/tmk_keyboard.git/blob - tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udp_link_layer_auto.py
xt_usb: Fix XT soft reset
[max/tmk_keyboard.git] / tmk_core / tool / mbed / mbed-sdk / workspace_tools / host_tests / udp_link_layer_auto.py
1 """
2 mbed SDK
3 Copyright (c) 2011-2013 ARM Limited
4
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
8
9     http://www.apache.org/licenses/LICENSE-2.0
10
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 """
17
18 """
19 How to use:
20 make.py -m LPC1768 -t ARM -d E:\ -n NET_14
21 udp_link_layer_auto.py -p COM20 -d E:\ -t 10
22 """
23
24 import re
25 import uuid
26 import socket
27 import thread
28 from sys import stdout
29 from time import time, sleep
30 from host_test import DefaultTest
31 from SocketServer import BaseRequestHandler, UDPServer
32
33
34 # Received datagrams (with time)
35 dict_udp_recv_datagrams = dict()
36
37 # Sent datagrams (with time)
38 dict_udp_sent_datagrams = dict()
39
40
41 class UDPEchoClient_Handler(BaseRequestHandler):
42     def handle(self):
43         """ One handle per connection
44         """
45         _data, _socket = self.request
46         # Process received datagram
47         data_str = repr(_data)[1:-1]
48         dict_udp_recv_datagrams[data_str] = time()
49
50
51 def udp_packet_recv(threadName, server_ip, server_port):
52     """ This function will receive packet stream from mbed device
53     """
54     server = UDPServer((server_ip, server_port), UDPEchoClient_Handler)
55     print "[UDP_COUNTER] Listening for connections... %s:%d"% (server_ip, server_port)
56     server.serve_forever()
57
58
59 class UDPEchoServerTest(DefaultTest):
60     ECHO_SERVER_ADDRESS = ""       # UDP IP of datagram bursts
61     ECHO_PORT = 0                  # UDP port for datagram bursts
62     CONTROL_PORT = 23              # TCP port used to get stats from mbed device, e.g. counters
63     s = None                       # Socket
64
65     TEST_PACKET_COUNT = 1000       # how many packets should be send
66     TEST_STRESS_FACTOR = 0.001     # stress factor: 10 ms
67     PACKET_SATURATION_RATIO = 29.9 # Acceptable packet transmission in %
68
69     PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
70     re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
71
72     def get_control_data(self, command="stat\n"):
73         BUFFER_SIZE = 256
74         try:
75             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
76             s.connect((self.ECHO_SERVER_ADDRESS, self.CONTROL_PORT))
77         except Exception, e:
78             data = None
79         s.send(command)
80         data = s.recv(BUFFER_SIZE)
81         s.close()
82         return data
83
84     def test(self):
85         serial_ip_msg = self.mbed.serial_readline()
86         if serial_ip_msg is None:
87             return self.RESULT_IO_SERIAL
88         stdout.write(serial_ip_msg)
89         stdout.flush()
90         # Searching for IP address and port prompted by server
91         m = self.re_detect_server_ip.search(serial_ip_msg)
92         if m and len(m.groups()):
93             self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
94             self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
95             self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
96
97         # Open client socket to burst datagrams to UDP server in mbed
98         try:
99             self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
100         except Exception, e:
101             self.s = None
102             self.notify("HOST: Error: %s"% e)
103             return self.RESULT_ERROR
104
105         # UDP replied receiver works in background to get echoed datagrams
106         SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
107         SERVER_PORT = self.ECHO_PORT + 1
108         thread.start_new_thread(udp_packet_recv, ("Thread-udp-recv", SERVER_IP, SERVER_PORT))
109         sleep(0.5)
110
111         # Burst part
112         for no in range(self.TEST_PACKET_COUNT):
113             TEST_STRING = str(uuid.uuid4())
114             payload = str(no) + "__" + TEST_STRING
115             self.s.sendto(payload, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
116             dict_udp_sent_datagrams[payload] = time()
117             sleep(self.TEST_STRESS_FACTOR)
118
119         if self.s is not None:
120             self.s.close()
121
122         # Wait 5 seconds for packets to come
123         result = True
124         self.notify("HOST: Test Summary:")
125         for d in range(5):
126             sleep(1.0)
127             summary_datagram_success = (float(len(dict_udp_recv_datagrams)) / float(self.TEST_PACKET_COUNT)) * 100.0
128             self.notify("HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms"% (d,
129                                                                                                      summary_datagram_success,
130                                                                                                      len(dict_udp_recv_datagrams),
131                                                                                                      self.TEST_PACKET_COUNT,
132                                                                                                      self.TEST_STRESS_FACTOR))
133             result = result and (summary_datagram_success >= self.PACKET_SATURATION_RATIO)
134             stdout.flush()
135
136         # Getting control data from test
137         self.notify("...")
138         self.notify("HOST: Mbed Summary:")
139         mbed_stats = self.get_control_data()
140         self.notify(mbed_stats)
141         return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
142
143
144 if __name__ == '__main__':
145     UDPEchoServerTest().run()