项目总览
mulNICtl 是一个多网络接口控制与传输实验工具,支持网络拓扑配置、数据流传输模拟以及实时监控与调度。项目目录结构如下:
1 | . |
项目架构图:

Requirements
项目需要3.10及以上版本的python解释器。相关python依赖可以通过以下指令配置:
1
pip3 install -r requirements.txt
项目需要rust环境来编译构建
stream-replay工具。相关环境可以通过以下指令配置:1
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
stream-replay是用来进行流量传输模拟的工具,在配置好rust环境后可以通过以下指令编译:1
2cd stream-replay
cargo build --release
Monitor端
功能
monitor端是实验的总控制端,主要功能包括:
- 对模拟传输实验进行总体调度,配置干扰流,数据流等参数
- 接收,存储回传来自
Solver的统计数据并进行可视化处理(可选)
用法
为了便于说明,我们把两个设备之间的链路记作两个设备无线接口名的元组,例如 ('wlan0', 'wlan1'), 这样的无线接口名可以通过在设备命令行输入 iw dev 来查看.
NOTICE: 以下操作应在项目根目录进行.
设置
monitor监控端:1
python3 tap.py -s
终端会列出所有连接的设备,此时可以对照具体设备配置进行连接检查
设置自定义名称的用户设备,例如设置设备
STA1:1
python3 tap.py -c 192.168.3.72 -n STA1
在config/topo下新建并编写
'{date}.txt'文件来表示网络拓扑。每个链路占据一行,格式如下:1
STA1 --wlan0-- --wlan1-- STA2
其中
--wlan0-- --wlan1--是STA1和STA2之间的链路,含义已在上面介绍monitor端的基本配置已经完成,用户可以在expSrc下新建并编写实验代码exp.py,然后在run_exp.py中添加搜索路径即可。 最后,通过python3 ./run_exp.py来开始实验。exp.py是进行数据采集实验的脚本,主要流程为:- 根据topo文件加载网络拓扑结构
- 配置业务流,干扰流的参数,包括数量,吞吐量,运行时间等
- 创建传输流,对tx进行指令下发
- 启动控制器(
Solver端) - 启动通信进程(通过
start_comm_process实现),接收回传的统计数据以及结果文件 - 分析数据并进行可视化处理(可选)
以下是一个简单的exp.py实验脚本的示例,用来根据topo文件构建链路并控制STA1向STA2发送文件数据流:
1 | import util.ctl as ctl |
注意事项
对于每次实验:
- 自行配置当天的拓扑文件
- 修改exp.py中的
date变量,并在exp.py新建启动函数 - 在
dpScript按实验日期新建数据目录
Solver端
Function
Solver是使用rust编写的控制端程序,主要功能为:
- 收集与回传数据: 使用
qos_collect接收受控设备回传的统计数据(例如rtt,channel_rtt,tx_part),简单处理格式后回传给监控端(monitor) - 决策生成与控制: 根据接收到的统计数据以及决策算法进行决策(相关决策算法位于
./cores目录),生成action command,将控制命令发送给受控设备
Structure
目录 /solver 结构如下所示:
1 | . |
solver可以通过以下指令编译:
1 | cd ./solver |
Solver详解
以下来说明Solver各个子目录的功能:
./api:子文件为ipc.rs,定义了负责和单个受控设备通信的结构体ipc_controller以及和多个受控设备通信的结构体ipc_Manager,包括了一系列收集数据、发送指令的方法./cores:该目录定义了一系列根据统计数据生成动作的决策方法。主进程根据实时统计数据在./cores中进行不同的的算法选择。决策实现如下:文件 决策方法 back_switch_solver.rsBackSwitchSolverfile_restrict.rsFileSolvergreen_solver.rsGSolverprediction.rs定义了一系列线性回归的方法,被以上三个方法调用./types:该目录定义了一系列必要的数据结构和参数,其中,parameter.rs定义了Solver的超参数。./tests:该目录定义了一系列测试用例,分别对ipc和线性回归函数进行测试。
Hyper Parameter
1 | pub struct HyperParameter<'a> { |
throttle_low和throttle_high分别是节流值throttle的下限和上限。backward_threshold是一个范围在 0 到 1 之间的阈值比例,用于确定信道是否可以开始切换回单信道模式。具体来说,更高的值意味着更激进的切换。epsilon_rtt是用于确定信道是否平衡的 RTT 差值的阈值。scale_factor是用于调整两个信道之间数据tx_part的缩放因子。degration_threshold是用于确定信道是否退化的 RTT 差值的阈值。degration_tx_part_threshold是用于确定信道是否退化的tx_part的阈值。wait_slots是在Solver开始调整tx_part之前需要等待的时隙数量。maximum_his_len是Solver历史记录的最大长度。ports_tobe_pop是待移除的端口,即这些端口不受控制。running_duration是Solver的运行时长。ctl_time是Solver开始控制的时间。his_back_time是用于确定信道是否曾经处于双信道模式的Solver历史的时间间隔。时间间隔越长,Solver从单信道模式切换到双信道模式就越保守。back_off_rtt_threshold_factor是用于确定参考回退时间的阈值,即该因子越低,回退越不可能发生。balance_channel_rtt_thres是 RTT 差值的最大阈值。如果信道的 RTT 大于此阈值,Solver将选择最大变化步长或决定执行基于线性拟合的控制。balance_time_thres是用于确定信道是否平衡的历史时间间隔。balance_tx_part_thres是tx_part的阈值。只有当tx_part大于此阈值时,这些历史记录才会用于基于线性拟合的控制。balance_rtt_thres是 RTT 差值的阈值。只有当 RTT 差值大于此阈值时,这些历史记录才会用于基于线性拟合的控制。
用法
Solver的运行状态高度依赖于以上的超参数集,超参数可以通过设置HYPER_PARAMETER。
使用这种方法启动控制器时必须指定以下命令行参数:
target-ips:link_name(tos@port)到链路ipc地址(ip:ipc_port)的映射name2ipc:stream_name(tos@port)到link_name(wlan_PC_phone)的映射base-info:所有stream的配置信息monitor-ip:监控端的ip地址
Solver可以通过构建命令行字符串+subprocess启动,但是一般通过监控脚本exp.py调用conn.batch()的形式启动,如以下代码所示:
1 | import base64 |
NOTICE:Solver可以在监控设备所在子网内的任何设备上运行,命令行参数中,监控设备的 IP 地址moniter-ip应为监控设备无线网卡的 IP 地址,该地址对应的监控设备显示Solver结果的绘图。
Stream-Replay传输系统
简介
Stream-Replay是一个数据传输系统,包括tx端(数据发送端)和rx端(数据接收端),用于模拟真实网络环境下的wifi传输
Requirements
Python3, numpy
特性
根据
*.npy文件生成UDP stream.可以通过改变
manifest.json文件参数实现改变stream的配置. 以下是一个manifest.json的示例:
1 | { |
- 支持 IPC(进程间通信)以实现对网络的实时监控和控制。
用法
可以通过以下指令编译运行TX和RX端程序
Tx:
1 | cargo run --bin stream-replay <manifest_file> <target_ip_address> <duration> [--ipc-port <IPC_PORT>] |
Rx:
1 | cargo run --bin stream-replay-rx <port> <duration> [calc-rtt] |
数据实时图
Stream-Replay系统对多个live stream进行IPC节流控制(throttle contorl),并获得RTT数据反馈

TX端
简介
TX端是数据传输实验的发射端,功能包括:
- 向RX端传输数据
- 接受Solver端发来的tx_part,throttle调度
- 向Solver端回传平均rtt等统计数据
文件结构
- src/
- statistic/
- mod.rs
- rtt_records.rs
- broker.rs
- conf.rs
- dispatcher.rs
- ipc.rs
- lib.rs
- link.rs
- main.rs
- rtt.rs
- source.rs
- throttle.rs
- tx_part_ctl.rs
- statistic/
- Cargo.toml
主要文件介绍
statistic/rtt_recoder.rs
- 实现了RTT的记录与管理功能
- 结构
- 定义了单条RTT记录结构
RTTEntry以及管理多条RTTEntry的结构RTTRecord
- 定义了单条RTT记录结构
- 函数
- 实现了
statistic方法用于计算当前设备的平均RTT及信道平均RTT等参数
- 实现了
rtt.rs
- 实现了RTT数据的记录与管理
- 结构
- 定义了管理RTT测量记录的结构体
RttRecorder,包含rtt_records、端口、服务名称,以及维护记录线程和接收线程
- 定义了管理RTT测量记录的结构体
- 函数
- 定义了线程函数
record_thread,负责记录发送包的序列号 - 定义了线程函数
pong_recv_thread,负责接收ACK并计算RTT,将结果记录至rtt_records及日志文件
- 定义了线程函数
conf.rs
- 实现了发射流参数的配置管理
- 结构
- 定义了单条流发射配置结构体
StreamParams - 定义了管理
TX端全局发射配置的结构体Manifest,该结构由所有StreamParams集合及其他配置参数组成
- 定义了单条流发射配置结构体
- 函数
-实现了对配置合法性进行校验的方法conf::validate
dispatcher.rs
- 实现了网络数据包的分发逻辑,为每个链路创建了数据发送通道
- 函数
- 定义了分发器函数
dispatch(),为多个网络链路创建UDPsocket,并通过socket_thread()为每条链路创建独立发送线程,返回将tx_ipaddr映射至发送通道(flume::Sender<PacketStruct>)的哈希表,并为数据发送socket设置了防阻塞机制
- 定义了分发器函数
link.rs
- 实现了网络链路配置管理
- 结构
- 定义了数据结构
Link,存储tx端和rx端的IP地址,并实现其反序列化trait
- 定义了数据结构
source.rs
- 实现了流数据的管理与传输
- 结构
- 定义了流管理器
SourceManager
- 定义了流管理器
- 函数
- 实现方法
start()用于开始数据传输 - 实现方法
set_tx_parts()用于接受控制指令 - 实现方法
Statistic()用于收集统计数据 - 以上三个方法用于响应
Solver端的控制指令 - 定义了线程函数
stream_thread,模拟实时数据发送(如投屏流) - 定义了线程函数
source_thread,处理预定义静态数据发送(如文件流) SourceManager::start根据npy文件协议头选择并启动相应线程
- 实现方法
ipc.rs
- 实现了tx端的IPC通信机制
- 结构
- 定义了IPC守护进程
IPCDaemon,负责处理来自其他进程的请求并返回结果
- 定义了IPC守护进程
- 函数
handle_requests方法接收请求并根据枚举类型处理,目前支持Solver的三类请求:throttle调节、tx_part调节、Statistics统计信息请求start_loop方法启动IPC守护进程,监听Solver的请求
main.rs
- 实现了发送端主进程逻辑
- 结构
- 定义了命令行参数结构体
ProgArgs,封装TX端运行所需参数,包括manifest配置文件路径、IPC守护进程循环时间duration及监听端口ipc_port
- 定义了命令行参数结构体
- 函数
- 定义了tx端的入口函数
main(),该函数通过ProgArgs解析命令行参数并执行TX端流程:- 读取
manifest清单,构造StreamParam列表并启动发射线程 - 根据
duration和ipc_port启动IPC守护进程,监听Solver请求
- 读取
- 定义了tx端的入口函数
RX端
简介
RX端是数据传输实验的接收端,功能包括:
- 接收来自TX的数据包,并返回ACK
- 本地记录rtt和stuttering等统计数据
文件结构
- src/
- statistic/
- mod.rs
- stuttering.rs
- destination.rs
- lib.rs
- main.rs
- record.rs
- statistic/
- Cargo.toml
主要文件介绍
stuttering.rs
- 实现了ACK时间戳的记录以及抖动率的计算
- 结构
- 定义了
Stutter结构来维护所有ACK packet的发送时间戳
- 定义了
- 函数
- 为
Stutter实现了update方法来更新ACK时间戳,实现了get_stuttering方法来计算抖动率
- 为
record.rs
- 实现了数据包的记录,处理,以及接收状态的检查
- 结构
- 定义了对单个数据包进行记录的结构
RecvRecord以及对全局数据进行记录的结构RecvData,后者以seq号为索引存储必要的RecvData
- 定义了对单个数据包进行记录的结构
- 函数
RecvRecord::record实现了对数据包(packet)进行记录RecvRecord::determine_complete负责对各个链路传输状态(传输完成情况)进行评估RecvRecord::gather实现了对离散的packet进行聚合
destination.rs
- 实现了数据包接收逻辑,包括:
- 创建接受数据的udp socket,监听指定端口
- 接收数据包并存储到
RecvData中 - 创建pong socket并发送ACK
- 数据包完整时进行数据重组
- 结构
- 定义了命令行参数结构体
Args,包含了监听端口,持续时间等参数,用于设置接收线程recv_thread的工作状态
- 定义了命令行参数结构体
- 函数
- 定义了接收线程函数
recv_thread,实现了rx端的接收数据逻辑,用于持续接收数据包。通过调用handle_rtt记录并处理rtt数据,调用send_ack向数据源发送ACK确认
- 定义了接收线程函数
main.rs
main函数是rx端的入口函数,实现了以下流程:
- 从命令行中提取参数,包括监听端口以及休眠间隔
- 开启接收线程
destination::recv_thread - 计算并记录丢包率,抖动率等数据