3 Copyright (c) 2011-2013 ARM Limited
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
20 make.py -m LPC1768 -t ARM -d E:\ -n NET_14
21 udp_link_layer_auto.py -p COM20 -d E:\ -t 10
28 from sys import stdout
29 from time import time, sleep
30 from host_test import DefaultTest
31 from SocketServer import BaseRequestHandler, UDPServer
34 # Received datagrams (with time)
35 dict_udp_recv_datagrams = dict()
37 # Sent datagrams (with time)
38 dict_udp_sent_datagrams = dict()
41 class UDPEchoClient_Handler(BaseRequestHandler):
43 """ One handle per connection
45 _data, _socket = self.request
46 # Process received datagram
47 data_str = repr(_data)[1:-1]
48 dict_udp_recv_datagrams[data_str] = time()
51 def udp_packet_recv(threadName, server_ip, server_port):
52 """ This function will receive packet stream from mbed device
54 server = UDPServer((server_ip, server_port), UDPEchoClient_Handler)
55 print "[UDP_COUNTER] Listening for connections... %s:%d"% (server_ip, server_port)
56 server.serve_forever()
59 class UDPEchoServerTest(DefaultTest):
60 ECHO_SERVER_ADDRESS = "" # UDP IP of datagram bursts
61 ECHO_PORT = 0 # UDP port for datagram bursts
62 CONTROL_PORT = 23 # TCP port used to get stats from mbed device, e.g. counters
65 TEST_PACKET_COUNT = 1000 # how many packets should be send
66 TEST_STRESS_FACTOR = 0.001 # stress factor: 10 ms
67 PACKET_SATURATION_RATIO = 29.9 # Acceptable packet transmission in %
69 PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
70 re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
72 def get_control_data(self, command="stat\n"):
75 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
76 s.connect((self.ECHO_SERVER_ADDRESS, self.CONTROL_PORT))
80 data = s.recv(BUFFER_SIZE)
85 serial_ip_msg = self.mbed.serial_readline()
86 if serial_ip_msg is None:
87 return self.RESULT_IO_SERIAL
88 stdout.write(serial_ip_msg)
90 # Searching for IP address and port prompted by server
91 m = self.re_detect_server_ip.search(serial_ip_msg)
92 if m and len(m.groups()):
93 self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
94 self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
95 self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
97 # Open client socket to burst datagrams to UDP server in mbed
99 self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
102 self.notify("HOST: Error: %s"% e)
103 return self.RESULT_ERROR
105 # UDP replied receiver works in background to get echoed datagrams
106 SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
107 SERVER_PORT = self.ECHO_PORT + 1
108 thread.start_new_thread(udp_packet_recv, ("Thread-udp-recv", SERVER_IP, SERVER_PORT))
112 for no in range(self.TEST_PACKET_COUNT):
113 TEST_STRING = str(uuid.uuid4())
114 payload = str(no) + "__" + TEST_STRING
115 self.s.sendto(payload, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
116 dict_udp_sent_datagrams[payload] = time()
117 sleep(self.TEST_STRESS_FACTOR)
119 if self.s is not None:
122 # Wait 5 seconds for packets to come
124 self.notify("HOST: Test Summary:")
127 summary_datagram_success = (float(len(dict_udp_recv_datagrams)) / float(self.TEST_PACKET_COUNT)) * 100.0
128 self.notify("HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms"% (d,
129 summary_datagram_success,
130 len(dict_udp_recv_datagrams),
131 self.TEST_PACKET_COUNT,
132 self.TEST_STRESS_FACTOR))
133 result = result and (summary_datagram_success >= self.PACKET_SATURATION_RATIO)
136 # Getting control data from test
138 self.notify("HOST: Mbed Summary:")
139 mbed_stats = self.get_control_data()
140 self.notify(mbed_stats)
141 return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
144 if __name__ == '__main__':
145 UDPEchoServerTest().run()