—音乐数据传输控制—
UDP协议没有流量控制手段,故在音乐发送端设计一个令牌桶来控制音乐发送的速率
1. 内存检测
由于我设定音乐数据的传输是UDP协议,因此它自身不携带流量控制手段, 故需自己实现一个Demo用来控制音乐数据的发送速率,以此防止数据发送太快导致接收方来不及接受。
起初实现的流量控制Demo 存在double free 的问题
找到原因: 在destroy()中重复释放线程。
完善后内存泄漏问题得以解决。
2. 代码实例
附上代码:
tbf.h
#pragma once
#include <iostream>
#include <pthread.h>
#include <vector>
#include <algorithm>
using namespace std;
//工作函数
void* thr_work(void* p);
//令牌桶的组织结构
class mytbf_cl
{
public:
friend void* thr_work(void* p);
//初始化一个实例令牌桶
mytbf_cl(const unsigned int& cps, const unsigned int& burst);
//销毁job中所有令牌桶
~mytbf_cl();
//取令牌数
size_t fetchtoken(size_t size);
//还回令牌数
size_t returntoken(size_t size);
//销毁
void destroy();
private:
long int pos; //位置
unsigned int cps; //速率 这里 1个cps = 1个token
unsigned int burst; //上限
long int token; //令牌数
pthread_t tid;
pthread_mutex_t num_mut;
pthread_cond_t num_cond;
};
tbf.cpp
#include "tbf.h"
#include <iostream>
using namespace std;
//sleep
#include <unistd.h>
#include <string.h>
#include <pthread.h>
static const int MYTBF_MAX = 10;
static vector<mytbf_cl*> job(MYTBF_MAX, nullptr); //存放多个令牌桶的容器
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; //互斥量
//static pthread_once_t init_once = PTHREAD_ONCE_INIT;
static bool shutDown = false;
//令牌桶工作 这个函数在类外
void* thr_work(void* p)
{
while (!shutDown)
{
cout << "shutDown111 : " << shutDown << endl;
pthread_mutex_lock(&mut);
for (auto& b : job)
{
if (b != nullptr)
{
pthread_mutex_lock(&b->num_mut);
b->token += b->cps;
if (b->token > b->burst)
b->token = b->burst;
pthread_cond_broadcast(&b->num_cond);
pthread_mutex_unlock(&b->num_mut);
}
}
pthread_mutex_unlock(&mut);
sleep(1);
}
cout << "shutDown : " << shutDown << endl;
pthread_exit(NULL);
}
long int get_free_pos()
{
auto iter = find(job.begin(), job.end(), nullptr);
if (iter == job.end())
{
cout << "令牌桶容量已满,无法创建新的令牌桶" << endl;
return -1;
}
long int pos = iter - job.begin();
return pos;
}
mytbf_cl::mytbf_cl(const unsigned int& cps, const unsigned int& burst) //构造函数
{
int err;
long int pos;
this->cps = cps;
this->burst = burst;
this->token = 0;
pthread_mutex_init(&this->num_mut, NULL);
pthread_cond_init(&this->num_cond, NULL);
pthread_mutex_lock(&mut);
//查找job中空余位置
pos = get_free_pos();
if (pos < 0)
{
pthread_mutex_unlock(&mut);
return;
}
this->pos = pos;
job[pos] = this;
pthread_mutex_unlock(&mut);
//创建双线程
err = pthread_create(&this->tid, NULL, thr_work, NULL);
if (err)
{
cout << "pthread_create():Wrong" << endl;
return;
}
}
size_t mytbf_cl::fetchtoken(size_t size)
{
long int n;
if (size <= 0)
return -1;
pthread_mutex_lock(&this->num_mut);
while (this->token <= 0)
{
pthread_cond_wait(&this->num_cond, &this->num_mut);
}
n = min((long int)size, this->token);
this->token -= n;
pthread_mutex_unlock(&this->num_mut);
return n;
}
size_t mytbf_cl::returntoken(size_t size)
{
if (size <= 0)
return -1;
pthread_mutex_lock(&this->num_mut);
this->token += size;
if (this->token > this->burst)
{
this->token = this->burst;
}
pthread_cond_broadcast(&this->num_cond);
pthread_mutex_unlock(&this->num_mut);
return size;
}
//摧毁数组中当前令牌桶
void mytbf_cl::destroy()
{
cout << "执行销毁当前" << endl;
pthread_cond_destroy(&this->num_cond);
pthread_mutex_destroy(&this->num_mut);
pthread_mutex_lock(&mut);
job[pos] = nullptr;
pthread_mutex_unlock(&mut);
delete this;
}
//析构函数 销毁job中所有令牌桶
mytbf_cl::~mytbf_cl()
{
shutDown = true;
int res = pthread_join(tid, NULL);
if (res != 0)
cout << "pthread_join():" << strerror(errno) << endl;
cout << "come in !!!!?";
//for (auto& a : job)
//{
// cout << "come in ?";
// if (a != nullptr)
// {
// cout << "mytbf pthread tid " << a->tid << ": exit....." << endl;
// pthread_mutex_destroy(&a->num_mut);
// pthread_cond_destroy(&a->num_cond);
// pthread_mutex_lock(&mut);
// a = nullptr;
// pthread_mutex_unlock(&mut);
// delete a;
// }
//}
pthread_mutex_destroy(&mut);
}
main.cpp
测试用例 在 linux控制台 传入两个参数 eg: ./mytbf1Demo.out /etc/magic
#include <cstdio>
#include "tbf.h"
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#define CPS 30
#define BUFSIZE 1024
#define BURST 100
int main(int argc, char** argv)
{
int sfd, dfd = 1;
char buf[BUFSIZE];
int len, ret, pos, size;
mytbf_cl* tbf = new mytbf_cl(CPS, BURST);
if (argc < 2)
{
fprintf(stderr, "Usage...\n");
exit(1);
}
do
{
sfd = open(argv[1], O_RDONLY);
if (sfd < 0)
{
if (errno != EINTR)
{
perror("Open():");
exit(1);
}
}
} while (sfd < 0);
while (1)
{
size = tbf->fetchtoken(BUFSIZE); //取了size个令牌
if (size < 0)
{
fprintf(stderr, "mytbf_fetchtoken():%s\n", strerror(-size));
exit(1);
}
while ((len = read(sfd, buf, size)) < 0) //读了len个令牌
{
if (errno == EINTR)
continue;
perror("Read()");
break;
}
if (len == 0)
{
break;
}
//即将写之前
if (size - len > 0)
tbf->returntoken(size - len);
pos = 0;
while (len > 0)
{
ret = write(dfd, buf + pos, len);
if (ret < 0)
{
if (errno == EINTR)
continue;
perror("Write():");
exit(1);
}
pos += ret;
len -= ret;
}
}
close(sfd);
delete tbf;
sleep(3);
exit(0);
}
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 351134995@qq.com