音乐数据传输控制

  1. —音乐数据传输控制—
    1. 1. 内存检测
    2. 2. 代码实例
      1. tbf.h
      2. tbf.cpp
      3. main.cpp

—音乐数据传输控制—

​ UDP协议没有流量控制手段,故在音乐发送端设计一个令牌桶来控制音乐发送的速率

1. 内存检测

​ 由于我设定音乐数据的传输是UDP协议,因此它自身不携带流量控制手段, 故需自己实现一个Demo用来控制音乐数据的发送速率,以此防止数据发送太快导致接收方来不及接受。

起初实现的流量控制Demo 存在double free 的问题

tbf泄漏检测

找到原因: 在destroy()中重复释放线程。

完善后内存泄漏问题得以解决。

tbf泄漏检测

2. 代码实例

附上代码:

tbf.h

#pragma once
#include <iostream>
#include <pthread.h>
#include <vector>
#include <algorithm>

using namespace std;


//工作函数
void* thr_work(void* p);

//令牌桶的组织结构
class mytbf_cl
{

public:

    friend void* thr_work(void* p);
    
    //初始化一个实例令牌桶
    mytbf_cl(const unsigned int& cps, const unsigned int& burst);
    
    //销毁job中所有令牌桶
    ~mytbf_cl();


    //取令牌数
    size_t fetchtoken(size_t size);
    
    //还回令牌数
    size_t returntoken(size_t size);
    
    //销毁
    void destroy();




private:
    long int pos;    //位置
    unsigned int cps;    //速率    这里 1个cps = 1个token
    unsigned int  burst;  //上限
    long int  token;  //令牌数

    pthread_t tid;
    pthread_mutex_t num_mut;
    pthread_cond_t  num_cond;

};

tbf.cpp

#include "tbf.h"
#include <iostream>
using namespace std;
//sleep
#include <unistd.h>
#include <string.h>
#include <pthread.h>



static const int MYTBF_MAX = 10;
static vector<mytbf_cl*> job(MYTBF_MAX, nullptr);    //存放多个令牌桶的容器

static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; //互斥量
//static pthread_once_t init_once = PTHREAD_ONCE_INIT;

static bool shutDown = false;


//令牌桶工作 这个函数在类外
void* thr_work(void* p)
{

    while (!shutDown)
    {
        cout << "shutDown111 : " << shutDown << endl;
    
        pthread_mutex_lock(&mut);
        for (auto& b : job)
        {
            if (b != nullptr)
            {
                pthread_mutex_lock(&b->num_mut);
                b->token += b->cps;
                if (b->token > b->burst)
                    b->token = b->burst;
                pthread_cond_broadcast(&b->num_cond);
                pthread_mutex_unlock(&b->num_mut);
    
            }
        }
        pthread_mutex_unlock(&mut);
        sleep(1);
    }
    
    cout << "shutDown : " << shutDown << endl;


    pthread_exit(NULL);

}




long int get_free_pos()
{

    auto iter = find(job.begin(), job.end(), nullptr);
    
    if (iter == job.end())
    {
        cout << "令牌桶容量已满,无法创建新的令牌桶" << endl;
        return -1;
    }
    long int pos = iter - job.begin();
    return pos;


}


mytbf_cl::mytbf_cl(const unsigned int& cps, const unsigned int& burst)  //构造函数
{


    int err;
    long int pos;
    
    this->cps = cps;
    this->burst = burst;
    this->token = 0;
    pthread_mutex_init(&this->num_mut, NULL);
    pthread_cond_init(&this->num_cond, NULL);


    pthread_mutex_lock(&mut);
    //查找job中空余位置
    pos = get_free_pos();
    if (pos < 0)
    {
        pthread_mutex_unlock(&mut);
        return;
    
    }
    this->pos = pos;
    job[pos] = this;
    
    pthread_mutex_unlock(&mut);
    
    //创建双线程
    err = pthread_create(&this->tid, NULL, thr_work, NULL);
    if (err)
    {
        cout << "pthread_create():Wrong" << endl;
        return;
    }

}



size_t mytbf_cl::fetchtoken(size_t size)
{
    long int n;

    if (size <= 0)
        return -1;


    pthread_mutex_lock(&this->num_mut);
    while (this->token <= 0)
    {
        pthread_cond_wait(&this->num_cond, &this->num_mut);
    }
    
    n = min((long int)size, this->token);
    this->token -= n;


    pthread_mutex_unlock(&this->num_mut);
    
    return n;


}

size_t mytbf_cl::returntoken(size_t size)
{

    if (size <= 0)
        return -1;
    
    pthread_mutex_lock(&this->num_mut);
    this->token += size;
    if (this->token > this->burst)
    {
        this->token = this->burst;
    }
    pthread_cond_broadcast(&this->num_cond);
    pthread_mutex_unlock(&this->num_mut);
    return size;

}


//摧毁数组中当前令牌桶
void mytbf_cl::destroy()
{


    cout << "执行销毁当前" << endl;
    
    pthread_cond_destroy(&this->num_cond);
    pthread_mutex_destroy(&this->num_mut);
    
    pthread_mutex_lock(&mut);
    job[pos] = nullptr;
    pthread_mutex_unlock(&mut);


    delete this;

}


//析构函数 销毁job中所有令牌桶
mytbf_cl::~mytbf_cl()
{

    shutDown = true;
    int res = pthread_join(tid, NULL);
    if (res != 0)
        cout << "pthread_join():" << strerror(errno) << endl;
    cout << "come in !!!!?";
    
    //for (auto& a : job)
    //{
    //    cout << "come in ?";
    //    if (a != nullptr)
    //    {
    
    //        cout << "mytbf pthread tid " << a->tid << ": exit....." << endl;
    //        pthread_mutex_destroy(&a->num_mut);
    //        pthread_cond_destroy(&a->num_cond);
    
    //        pthread_mutex_lock(&mut);
    //        a = nullptr;
    //        pthread_mutex_unlock(&mut);
    
    //        delete a;
    
    //    }
    
    //}
    pthread_mutex_destroy(&mut);

}

main.cpp

测试用例 在 linux控制台 传入两个参数 eg: ./mytbf1Demo.out /etc/magic

#include <cstdio>
#include "tbf.h"
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>

#define CPS 30
#define BUFSIZE 1024
#define BURST 100


int main(int argc, char** argv)
{
    int sfd, dfd = 1;
    char buf[BUFSIZE];
    int len, ret, pos, size;
    mytbf_cl* tbf = new mytbf_cl(CPS, BURST);
    if (argc < 2)
    {
        fprintf(stderr, "Usage...\n");
        exit(1);
    }

    do
    {
        sfd = open(argv[1], O_RDONLY);
        if (sfd < 0)
        {
            if (errno != EINTR)
            {
                perror("Open():");
                exit(1);
            }
        }
    
    } while (sfd < 0);


    while (1)
    {
        size = tbf->fetchtoken(BUFSIZE);  //取了size个令牌
        if (size < 0)
        {
            fprintf(stderr, "mytbf_fetchtoken():%s\n", strerror(-size));
            exit(1);
        }
    
        while ((len = read(sfd, buf, size)) < 0)   //读了len个令牌
        {
            if (errno == EINTR)
                continue;
            perror("Read()");
            break;
        }
        if (len == 0)
        {
            break;
        }
    
        //即将写之前
        if (size - len > 0)
            tbf->returntoken(size - len);

        pos = 0;
        while (len > 0)
        {
            ret = write(dfd, buf + pos, len);
            if (ret < 0)
            {
                if (errno == EINTR)
                    continue;
                perror("Write():");
                exit(1);
            }
            pos += ret;
            len -= ret;
        }
    
    }


    close(sfd);
    
    delete tbf;
    sleep(3);
    exit(0);

}

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 351134995@qq.com

×

喜欢就点赞,疼爱就打赏