案例一:利于Python调用JSON对象来实现对XENA流量测试仪的灵活发包测试,能够适应Pair,Rotate,1-to-Many等多种拓扑模型
2020-12-13 15:20
标签:xen pytho 类型 发送 learning while protocol The 数据 硬件:XENA Valkyrie 或 Vantage主机,测试板卡不限,本方法适用于其100M~400G所有速率端口 环境配置:Python 3 实现功能: 1.控制流量仪进行流量测试,预定配置的流量发送,报文统计,丢包率,延迟等信息的统计 2.单个Python脚本配合不同的JSON对象文件,来实现不同端口数量,不同拓扑模型下的流量测试 3.在发包测试前及运行过程中都对发包端口的链路状态进行监控100M/1G/2.5G/5G/10G 4.嵌入ARP探测,获取DUT的MAC信息,并记录在测试日志中 5.统计端口流量的Tx,Rx信息,丢包率,延迟信息 设计思路:main文件控制主流程,嵌入Statistics模块进行数据统计,CreateClass模块进行命令预定义 运行效果:
主要实现代码: JSON对象配置文件 案例一:利于Python调用JSON对象来实现对XENA流量测试仪的灵活发包测试,能够适应Pair,Rotate,1-to-Many等多种拓扑模型 标签:xen pytho 类型 发送 learning while protocol The 数据 原文地址:https://www.cnblogs.com/xena/p/11579033.html 1 import threading, time, json, random, queue, types, sys, socket
2 from binascii import hexlify
3 from TestUtilsL23 import XenaScriptTools
4 from CreateClass import StreamCreate
5 from Statistics import CollectTestresult
6
7 def BuildDict(ports, configvalue, watching, watching_2):
8 #Generate ports Dictionary
9 a = 1
10 for port in ports:
11 ip_address_b = configvalue[‘IP‘][0:10]
12 ip_address_c = ip_address_b + str(a)
13 mac_a = configvalue[‘mac‘][0:10]
14 if a > 9:
15 mac_address = mac_a + str(a)
16 else:
17 mac_address = mac_a + ‘0‘ + str(a)
18 ip_address_a = hexlify(socket.inet_aton(ip_address_c)).decode()
19 mac_ip = [str(mac_address), str(ip_address_a)]
20 watching[port] = mac_ip
21 a += 1
22
23 watching_2[‘type_eth‘] = ‘FFFF‘,
24 watching_2[‘type_vlan‘] = ‘8100‘,
25 watching_2[‘type_ip‘] = ‘08004500006A000000007F11BA2D‘,
26 watching_2[‘type_udp‘] = ‘00000000‘,
27 watching_2[‘type_tcp‘] = ‘000000000000000050000000E5A40000‘,
28 watching_2[‘tcpid‘] = [hex(int(configvalue[‘tcpPort‘][0]))[2:].zfill(4) + hex(int(configvalue[‘tcpPort‘][1]))[2:].zfill(4)]
29
30
31 def PrintMes(ports, configvalue, ip_address):
32 localtime = time.asctime( time.localtime(time.time()) )
33 print("\n##############################################")
34 print(" 测试配置 ")
35 print("##############################################\n")
36 print("测试时间 : " + str(localtime))
37 print("设备IP地址 : " + ip_address)
38 print("测试端口 : " + str(ports))
39 print("测试模式 : " + configvalue[‘Stype‘])
40 print("持续时间 : " + configvalue[‘testtime‘])
41 print("报文类型 : " + configvalue[‘header_type‘])
42 print("报文长度 : " + configvalue[‘packet‘])
43 if configvalue[‘Stype‘] == ‘Aggregation‘:
44 print("上行VLAN : " + str(configvalue[‘uvlan‘]))
45 print("下行VLAN : " + str(configvalue[‘uvlan‘]))
46 print("WAN速率 : " + str(configvalue[‘wanrate‘]))
47 print("LAN速率 : " + str(configvalue[‘lanrate‘]))
48 else:
49 print("VLAN : " + str(configvalue[‘uvlan‘]))
50 print("速率设置 : " + str(configvalue[‘wanrate‘]))
51 print("端口速率 : " + str(configvalue[‘portrate‘]))
52 print("学习时间 : " + configvalue[‘learntime‘])
53 print("丢包率设置 : " + configvalue[‘threshold‘])
54 if configvalue[‘snlearnenable‘] == ‘1‘:
55 print("自动学习SN : Enable")
56 else:
57 print("自动学习SN : Disable")
58 if configvalue[‘portspeedcheck‘] == ‘1‘:
59 print("端口速率检测 : Enable" )
60 else:
61 print("端口速率检测 : Disable" )
62
63
64 def Maclearning(xm, ports, configvalue):
65 if configvalue[‘snlearnenable‘] == ‘1‘:
66 xm.SendExpectOK(ports [1] + " PS_CREATE [0]")
67 xm.SendExpectOK(ports [1] + " ps_tpldid [0] 0")
68 xm.SendExpectOK(ports [1] + " PS_PACKETLENGTH [0] FIXED 70 1518")
69 xm.SendExpectOK(ports [1] + " PS_HEADERPROTOCOL [0] ETHERNET ARP")
70 learnIP_hex = hexlify(socket.inet_aton(configvalue[‘learnIP‘])).decode()
71 xm.SendExpectOK(ports [1] + " PS_PACKETHEADER [0] 0xFFFFFFFFFFFF00000000000108060001080006040001000000000001C0A80164FFFFFFFFFFFF" + learnIP_hex)
72 xm.SendExpectOK(ports [1] + " PS_RATEPPS [0] 2")
73 xm.SendExpectOK(ports [1] + " PS_ENABLE [0] on")
74 xm.SendExpectOK(ports [1] + " P_TRAFFIC ON")
75 xm.SendExpectOK(ports [1] + " P_CAPTURE ON")
76 time.sleep(1)
77 xm.SendExpectOK(ports [1] + " P_CAPTURE OFF")
78 xm.SendExpectOK(ports [1] + " P_TRAFFIC OFF")
79 SN = xm.Send(ports[1] + " PC_PACKET [0] ?")
80 SN = SN.split(‘ ‘)[-1]
81 xm.SendExpectOK(ports [1] + " PS_DELETE [0]")
82 Serial = str(SN)
83 if SN == ‘
{
"##常用配置修改":"",
"ports": ["0/0","0/1","0/2","0/3","0/4","0/5"],
"packet": "FIXED 64 1518",
"testtime": "10",
"##注意:dvlan和lanrate只有在Aggregation模式生效,其他模式仅需uvlan和wanrate":"",
"##其中vlan设置为-1则表示不添加vlan":"",
"uvlan": "-1",
"dvlan": "-1",
"wanrate": ["100", "100", "100","100", "100", "100"],
"lanrate": ["21", "22", "23","24"],
"portrate": ["1000","1000","1000","1000","1000","1000"],
"##Stype为测试模式,分别可以配置为:Loopback, Eachother, Aggregation":"",
"##其中Loopback为环回测试,Eachother为两两互打测试,Aggregation为汇聚测试":"",
"Stype":"Eachother",
"##headertype为报文类型,分别可以配置为:TCP, UDP, IP, Ethernet":"",
"headertype": "Ethernet",
"##threshold为丢包率设置":"",
"threshold": "0",
"##不常用配置修改":"",
"##发送学习报文的时间":"",
"learntime": "3",
"##payload类型分别有: Pattern, Random":"",
"payload": "Pattern",
"##PRcheck为使能端口状态检测,1为开启,0为关闭;使能后端口为连接的时候会报错":"",
"PRcheck": "1",
"##PScheck为检测端口速率是否匹配,1为开启,0为关闭":"",
"PScheck":"1",
"tcpUdpPort": ["1024", "2048"],
"IP": "192.168.163.1",
"mac": "000000033333",
"##学习DUT的MAC地址":"",
"snlernenable": "0",
"learnIP": "192.168.2.1",
"##测试仪IP地址":"",
"ip_address": "192.168.1.200"
}
上一篇:C++类拷贝控制 深拷贝 浅拷贝
下一篇:Python 函数
文章标题:案例一:利于Python调用JSON对象来实现对XENA流量测试仪的灵活发包测试,能够适应Pair,Rotate,1-to-Many等多种拓扑模型
文章链接:http://soscw.com/essay/35047.html