简体中文

基于RDTAPIs的文件下载

基于RDT APIs的文件下载 | TUTK P2P SDK 开发指南

一、IO交互

1-250Z9153613641.png

文件下载过程中的IO交互主要分为以下几个步骤:

1、APP通过 avSendIOCtrl(IOCTRL_FILEMANAGER_FILE_LIST_REQ) 查询指定时间内的文件列表

2、设备通过 avSendIOCtrl(IOCTRL_FILEMANAGER_FILE_LIST_RESP) 回复文件列表

3、用户选择文件后,APP通过 avSendIOCtrl(IOCTRL_FILEMANAGER_FILE_DOWNLOAD_REQ) 通知设备

4、设备根据资源情况,通过 avSendIOCtrl(IOCTRL_FILEMANAGER_FILE_DOWNLOAD_RESP) 告知APP使用的通道数和API类型

5、双方创建通道并进行数据传输

6、传输完成后关闭通道


二、RDT通道的创建和销毁

创建(APP端/设备端通用)
int createChannelForDownload(int sid, int iotc_channel_id) {    return RDT_Create(sid, 5000, iotcChannelId); }
销毁(APP端/设备端通用)
void destoryChannelOfDownload(int rdt_id) {    if (rdt_id < 0)        return;    RDT_Abort(rdt_id); }

三、数据的传输

RDT协议本身提供的接口只有Read和Write,传输过程存在粘包的情况,需要另外设计切包组包的机制。每个数据包分为包头、数据和包尾,格式参考:公版RDT帧定义
设备端
说明:RDT的数据传输是可靠的传输,每次调用RDT_Write,都将数据写入本地的发送缓存区,所以等所有的数据都使用RDT_Write写入后,需要检查发送缓存区是否为空,为空,则说明数据已经送完。
写入数据相关(伪)代码:
#define RDT_HEADER_FRAMEBEGIN_ADDR 0 #define RDT_HEADER_FILENAME_ADDR 4 #define RDT_HEADER_FILESIZE_ADDR 68 #define RDT_HEADER_FRAMESIZE_ADDR 72 #define RDT_HEADER_ENDFLAG_ADDR 76 #define RDT_FRAME_BUFFER_ADDR 77 #define RDT_FRAME_BUFFER_SIZE 10243 #define RDT_FRAME_HEADER_SIZE 77 #define RDT_PAYLOAD_WRITE_POSTION 77 #define RDT_FRAME_TAIL_SIZE 2 #define RDT_FILENAME_MAX_LEN (RDT_HEADER_FILESIZE_ADDR - RDT_HEADER_FILENAME_ADDR) // 64字节 int createChannelForDownload(int sid, int iotc_channel_id); void destroyChannelOfDownload(int rdt_id); union typeXChange {    uint32_t uint32_data;    char char_data[4]; }; bool swapUint32ToCharBuffer(const uint32_t uint32_data, char* buffer, size_t bufferSize) {    if (!buffer || bufferSize != 4)        return false;    typeXChange tmp;    tmp.uint32_data = htonl(uint32_data); // 转换为网络字节序(大端)    memcpy(buffer, tmp.char_data, 4);    return true; }
生成帧头
void createPacketHeader(char* buffer_write_postion, size_t bufferSize, const char* fileName, uint32_t fileSize, uint32_t payloadSize,bool endFlag) {    // 校验缓冲区是否足够存储header    if (!buffer_write_postion || bufferSize < RDT_FRAME_HEADER_SIZE)        return;    memset(buffer_write_postion, 0, RDT_FRAME_HEADER_SIZE);        // 写入帧头标识 IOTC    buffer_write_postion[0] = 'I';    buffer_write_postion[1] = 'O';    buffer_write_postion[2] = 'T';    buffer_write_postion[3] = 'C';    // 拷贝文件名:限制长度,手动加终止符    strncpy(buffer_write_postion + RDT_HEADER_FILENAME_ADDR, fileName, RDT_FILENAME_MAX_LEN);    buffer_write_postion[RDT_HEADER_FILESIZE_ADDR - 1] = '\0'; // 确保终止    // 写入文件大小、payload大小(转换为uint32_t,避免截断)    swapUint32ToCharBuffer(fileSize, buffer_write_postion + RDT_HEADER_FILESIZE_ADDR, 4);    swapUint32ToCharBuffer(payloadSize, buffer_write_postion + RDT_HEADER_FRAMESIZE_ADDR, 4);        // 写入结束标志    buffer_write_postion[RDT_HEADER_ENDFLAG_ADDR] = endFlag ? 1 : 0; }
生成帧尾
void createPacketTail(char* buffer_write_postion, size_t bufferSize) {    if (!buffer_write_postion || bufferSize < RDT_FRAME_TAIL_SIZE)        return;    buffer_write_postion[0] = 'G';    buffer_write_postion[1] = 'C'; }
发送单个文件
int sendOneFile2Client(int rdt_id, file& f, bool isLastFile) {    int ret = 0;    bool rdt_endflag = false;    size_t fileSize = f.size();    std::string fileName = f.name();    uint32_t rdt_file_size = static_cast(fileSize); // 转为uint32_t(若文件超4G需改用uint64_t)    const size_t buffer_for_payload_size = RDT_FRAME_BUFFER_SIZE - RDT_FRAME_HEADER_SIZE - RDT_FRAME_TAIL_SIZE;    // 检查文件是否打开成功    if (!f.open()) {        fprintf(stderr, "File %s open failed\n", fileName.c_str());        return -1;    }    char* buffer = new char[RDT_FRAME_BUFFER_SIZE]();    if (!buffer) {        ret = -ENOMEM;        goto LAB_SEND_RETURN_ERROR;    }    while (true) {        // 读取payload:用ssize_t区分成功/失败        ssize_t readSize = f.read(buffer + RDT_PAYLOAD_WRITE_POSTION, buffer_for_payload_size);        if (readSize < 0) { // 读取错误            fprintf(stderr, "File %s read failed, err=%zd\n", fileName.c_str(), readSize);            delete[] buffer;            ret = -2;            goto LAB_SEND_RETURN_ERROR;        } else if (readSize == 0) { // 读取到文件尾            delete[] buffer;            break;        }        // 判断是否为最后一包        if (readSize < buffer_for_payload_size) {            if (isLastFile) {                rdt_endflag = true;            }        }        // 构建header和tail        createPacketHeader(buffer, RDT_FRAME_BUFFER_SIZE, fileName.c_str(), rdt_file_size, static_cast(readSize), rdt_endflag);        createPacketTail(buffer + RDT_FRAME_HEADER_SIZE + readSize, RDT_FRAME_TAIL_SIZE);        //发送完整包长度(header + payload + tail)        size_t send_len = RDT_FRAME_HEADER_SIZE + readSize + RDT_FRAME_TAIL_SIZE;        ret = RDT_Write(rdt_id, buffer, send_len);        if (ret < 0) {            fprintf(stderr, "RDT_Write failed, err=%d\n", ret);            delete[] buffer;            goto LAB_SEND_RETURN_ERROR;        }        // 最后一包:刷新并退出        if (rdt_endflag) {            RDT_Flush(rdt_id);            break;        }    }    if(buffer) delete[] buffer; // 释放缓冲区    buffer = NULL;    f.close();    return 0; // 成功返回0 LAB_SEND_RETURN_ERROR:    if(buffer) delete[] buffer; // 释放缓冲区:    f.close(); // 确保文件关闭,避免泄漏    return ret; }
发送文件列表
void sendFileList(void* arg, int sid, int iotc_channel_id) {    int rdt_id = createChannelForDownload(sid, iotc_channel_id);    if (rdt_id < 0) {        fprintf(stderr, "Create channel failed, rdt_id=%d\n", rdt_id);        return;            }        int ret = 0;    FileList* fileList = static_cast(arg);    int fileCount = fileList->count();    int fileIndex = 1;    //遍历文件,发送文件列表    for (auto& file : fileList->getFiles()) {        bool isLastFile = (fileCount == fileIndex);        ret = sendOneFile2Client(rdt_id, file, isLastFile);        if (ret < 0) {            fprintf(stderr, "Send file %s error %d\n", file.name().c_str(), ret);            break;                    }        fileIndex++;    }        // 检查发送队列是否清空    if (ret == 0) {        do {            st_RDT_Status status;            ret = RDT_Status_Check(rdt_id, &status);            if (ret < 0) {                fprintf(stderr, "RDT_Status_Check failed, err=%d\n", ret);                break;            }            if (status.BufSizeInSendQueue == 0) {                break;            } else {                usleep(100 * 1000); // 替代msleep,跨平台            }        } while (true);    }        // 关闭通道    destroyChannelOfDownload(rdt_id); }
APP端
接收端也需要按照对应的格式进行切包和解析数据,如果切包有误,则可能导致数据解析异常。
读取通道数据和解析方法(伪)代码:
//读数据头 #define HEADER_SIZE 77 #define RDT_BUFFER_SIZE 20480 int createChannelForDownload(int sid, int iotc_channel_id); void destroyChannelOfDownload(int rdt_id); // 解析RDT Header bool parseRDTHeader(const char* headerBuffer, std::string& fileName, int& dataLength, bool& endFlag) {    // 示例:从headerBuffer解析fileName、dataLength、endFlag    fileName = "example.txt";    dataLength = 1024;    endFlag = false;    return true; } // 读取完整的RDT Header int readRDTHeader(int rdt_id, char* headerBuffer, int bufferSize) {    if (rdt_id < 0 || !headerBuffer || bufferSize < HEADER_SIZE) {        fprintf(stderr, "readRDTHeader: invalid params\n");        return -1;    }    memset(headerBuffer, 0, HEADER_SIZE); // 仅清空Header部分,避免越界    int restSize = HEADER_SIZE;    int headerIndex = 0;    int retryCount = 0; // 超时重试计数    do {        //传入rdt_id参数        int size = RDT_Read(rdt_id, headerBuffer + headerIndex, restSize, 1000);        if (size > 0) {            headerIndex += size;            restSize -= size;            retryCount = 0; // 成功读取,重置重试计数        } else if (size == RDT_ER_TIMEOUT) {            retryCount++;            if (retryCount >= MAX_READ_RETRY) { // 超过最大重试次数,退出                fprintf(stderr, "readRDTHeader: timeout after %d retries\n", MAX_READ_RETRY);                return RDT_ER_TIMEOUT;            }            usleep(10 * 1000); // 超时后短暂休眠,避免频繁重试        } else { // 其他错误(如通道关闭)            fprintf(stderr, "readRDTHeader: RDT_Read error %d\n", size);            return size;        }    } while (restSize > 0);    return 0; }
读取二进制数据并保存到文件
int readBinaryDataAndSaveOneFrame(file& f, int rdt_id,char* dataBuffer, int bufferSize,int dataLength) {    // bufferSize需大于0,dataLength需大于0    if (rdt_id < 0 || !dataBuffer || bufferSize <= 0 || dataLength <= readbinarydataandsaveoneframe:="" invalid="" return="" if="" file="" not="" int="" restdatasize="" dataindex="0;" retrycount="0;" while=""> 0) {        // 动态调整读取大小:取bufferSize和剩余数据的较小值        int readSize = (bufferSize < restDataSize) ? bufferSize : restDataSize;        int size = RDT_Read(rdt_id, dataBuffer + dataIndex, readSize, 1000);        if (size > 0) {            // 写入文件:仅写入实际读取的size字节            int writeRet = f.write(dataBuffer + dataIndex, size);            if (writeRet < 0) {                fprintf(stderr, "readBinaryDataAndSaveOneFrame: file write error\n");                return -3;            }            restDataSize -= size;            dataIndex += size;            retryCount = 0;        } else if (size == RDT_ER_TIMEOUT) {            retryCount++;            if (retryCount >= MAX_READ_RETRY) {                fprintf(stderr, "readBinaryDataAndSaveOneFrame: timeout\n");                return RDT_ER_TIMEOUT;            }            usleep(100 * 1000);        } else { // 其他错误            fprintf(stderr, "readBinaryDataAndSaveOneFrame: RDT_Read error %d\n", size);            return size;        }    }    return 0; }
读取尾标识
int readTail(int rdt_id) {    const int TAIL_SIZE = 2;    char buffer[TAIL_SIZE] = {0};    int restSize = TAIL_SIZE;    int bufferIndex = 0; // 初始化偏移量    int retryCount = 0;    do {        int size = RDT_Read(rdt_id, buffer + bufferIndex, restSize, 1000);        if (size > 0) {            bufferIndex += size;            restSize -= size;            retryCount = 0;        } else if (size == RDT_ER_TIMEOUT) {            retryCount++;            if (retryCount >= MAX_READ_RETRY) {                fprintf(stderr, "readTail: timeout\n");                return RDT_ER_TIMEOUT;            }            usleep(100 * 1000);        } else {            fprintf(stderr, "readTail: RDT_Read error %d\n", size);            return size;        }    } while (restSize > 0);    // 校验尾标识    if (buffer[0] != 'G' || buffer[1] != 'C') {        fprintf(stderr, "readTail: invalid tail (expected GC, got %c%c)\n", buffer[0], buffer[1]);        return -1;    }    return 0; }
接收文件主逻辑
int recvFilesFromChannel(int rdt_id) {    if (rdt_id < 0) {        fprintf(stderr, "recvFilesFromChannel: invalid rdt_id\n");        return -1;    }    bool endFlag = false;    int ret = 0;    file f;    std::string fileName, lastFileName;    char headerBuffer[HEADER_SIZE] = {0};    char dataBuffer[RDT_BUFFER_SIZE] = {0};    while (!endFlag) {        // 1. 读取Header        ret = readRDTHeader(rdt_id, headerBuffer, sizeof(headerBuffer));        if (ret < 0) {            fprintf(stderr, "recvFilesFromChannel: read header failed %d\n", ret);            break;        }        // 2. 解析Header        int dataLength = 0;        if (!parseRDTHeader(headerBuffer, fileName, dataLength, endFlag)) {            fprintf(stderr, "recvFilesFromChannel: parse header failed\n");            ret = -2;            break;        }        // 3. 处理文件打开/切换        if (!f.isOpen()) {            if (!f.open(fileName)) { // 检查文件打开结果                fprintf(stderr, "recvFilesFromChannel: open file %s failed\n", fileName.c_str());                ret = -3;                break;            }            lastFileName = fileName;            fprintf(stdout, "recvFilesFromChannel: open file %s\n", fileName.c_str());        } else {            if (fileName != lastFileName) {                f.close(); // 关闭旧文件                if (!f.open(fileName)) {                    fprintf(stderr, "recvFilesFromChannel: open new file %s failed\n", fileName.c_str());                    ret = -3;                    break;                }                lastFileName = fileName;                fprintf(stdout, "recvFilesFromChannel: switch to file %s\n", fileName.c_str());            }        }        // 4. 读取二进制数据并保存        ret = readBinaryDataAndSaveOneFrame(f, rdt_id, dataBuffer, RDT_BUFFER_SIZE, dataLength);        if (ret < 0) {            fprintf(stderr, "recvFilesFromChannel: read data failed %d\n", ret);            break;        }        // 5. 读取并校验尾标识        ret = readTail(rdt_id);        if (ret < 0) {            fprintf(stderr, "recvFilesFromChannel: read tail failed %d\n", ret);            break;        }    }    // 确保文件关闭,避免资源泄漏    if (f.isOpen()) {        f.close();        fprintf(stdout, "recvFilesFromChannel: close file %s\n", lastFileName.c_str());    }    return ret; }
接收线程
struct ChannelParams { // 定义参数结构体,传递外部数据    int sid;    int iotc_channel_id; }; void createRDTChannelAndSaveFiles(void* arg) {    if (!arg) {        fprintf(stderr, "createRDTChannelAndSaveFiles: arg is null\n");        return;    }    ChannelParams* params = static_cast(arg);    int rdt_id = createChannelForDownload(params->sid, params->iotc_channel_id);    if (rdt_id < 0) {        fprintf(stderr, "createRDTChannelAndSaveFiles: create channel failed %d\n", rdt_id);        return;    }    // 接收数据并处理错误    int ret = recvFilesFromChannel(rdt_id);    if (ret < 0) {        fprintf(stderr, "createRDTChannelAndSaveFiles: recv files failed %d\n", ret);    } else {        fprintf(stdout, "createRDTChannelAndSaveFiles: recv files success\n");    }    destroyChannelOfDownload(rdt_id); }

四、结束标志的判断

在文件传输过程中,结束标志的判断非常重要。根据前面的代码实现,结束标志的处理主要体现在以下几个方面:
  • 包尾标识:每个数据包的结尾都有"GC"标识,用于确认数据包的完整性
  • 文件结束标志:当传输最后一个文件时,会设置endflag为1
  • 缓存区检查:发送完成后,通过RDT_Status_Check检查发送缓存区是否为空
  • 读取大小判断:当读取的数据大小小于缓冲区大小时,认为是文件的最后一包
这些机制共同确保了文件传输的可靠性和完整性,避免了数据丢失或传输不完整的问题。

即刻开启您的物联网之旅

联系解决方案专家
Kalay App
资讯安全白皮书
全球专利布局
解决方案
新闻动态
公司动态
行业资讯
媒体报道
永续发展
经营者的话
社会参与
环境永续
公司治理

+86 755 27702549

7×24小时服务热线

法律声明 隐私权条款

关注“TUTK”

TUTK服务尽在掌握

© 2022 物联智慧科技(深圳)有限公司版权所有粤ICP备14023641号
在线咨询
扫一扫

TUTK服务尽在掌握

全国免费服务热线
+86 755 27702549

返回顶部