Libeio--libeio初始化,REQ/RES队列,锁,线程
libeio数据
libeio使用的数据主要包括多线程(主线程和worker线程),锁(互斥锁和条件锁),队列(reqqueue和resqueue)。
libeio 初始化
int eio_init (void (*want_poll)(void), void (*done_poll)(void))
eioinit函数:初始化libeio库。成功返回0,失败返回-1并且设置合适的errno值。 1. 初始化libeio中的一些全局结构,比如:reqqueue,resqueue,以及各种互斥量等。 2. 保存外界传入的两个回调函数:wantpoll和done_poll。这两个函数都是边缘触发函数
etp_init (void (*want_poll)(void), void (*done_poll)(void))
{
//初始化三个互斥量wrklock, reslock, reqlock和一个条件变量reqwait
X_MUTEX_CREATE (wrklock);
X_MUTEX_CREATE (reslock);
X_MUTEX_CREATE (reqlock);
X_COND_CREATE (reqwait);
/*
*初始化两个队列:
* 1.eio_submit把request放入req_queue
* 2.eio_poll从res_queue中取出数据
*/
reqq_init (&req_queue);
reqq_init (&res_queue);
//work线程链表
wrk_first.next =
wrk_first.prev = &wrk_first;
/*
* number of worker threads currently running.
*/
started = 0;
/*
* 表示处于COND_WAIT状态下的空闲线程数
*/
idle = 0;
/*
* nreqs: number of requests currently handled by libeio.
* This is the total number of requests that have been submitted to libeio, but not yet destroyed.
*/
nreqs = 0;
/*
* nready : number of ready requests,
* i.e. requests that have been submitted but have not yet entered the execution phase.
*/
nready = 0;
/*
* npending : number of pending requests,
* i.e. requests that have been executed and have results,
* but have not been finished yet by a call to eio_poll)
*/
npending = 0;
//设置回调函数
want_poll_cb = want_poll;
done_poll_cb = done_poll;
return 0;
}
REQ队列和RES队列
etb_reqq结构体
typedef struct eio_req eio_req;
#define ETP_REQ eio_req
/*
* a somewhat faster data structure might be nice, but
* with 8 priorities this actually needs <20 insns
* per shift, the most expensive operation.
*/
typedef struct {
ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
int size;
} etp_reqq;
定义两个etp_reqq全局变量
static etp_reqq req_queue;
static etp_reqq res_queue;
初始化队列
static void ecb_noinline ecb_cold
reqq_init (etp_reqq *q)
{
int pri;
/*
* etp_reqq包含ETP_NUM_PRT个指向ETP_REQ的指针数组
* 每个数组成员的头指针和尾指针指向0
*/
for (pri = 0; pri < ETP_NUM_PRI; ++pri)
q->qs[pri] = q->qe[pri] = 0;
q->size = 0;
}
把request放入到队列中
static int ecb_noinline
reqq_push (etp_reqq *q, ETP_REQ *req)
{
int pri = req->pri;
req->next = 0;
/*
* 若qe[pri]已有数据则该更新队列的尾指针指向该request
* 若无数据则队列头指针和尾指针都指向该request
*/
if (q->qe[pri])
{
q->qe[pri]->next = req;
q->qe[pri] = req;
}
else
q->qe[pri] = q->qs[pri] = req;
return q->size++;
}
把request从队列中取出
static ETP_REQ * ecb_noinline
reqq_shift (etp_reqq *q)
{
int pri;
if (!q->size)
return 0;
--q->size;
for (pri = ETP_NUM_PRI; pri--; )
{
ETP_REQ *req = q->qs[pri];
if (req)
{
/* 队列数据取完,头和尾指针指向0 */
if (!(q->qs[pri] = (ETP_REQ *)req->next))
q->qe[pri] = 0;
return req;
}
}
/* 无数据可取,异常退出 */
abort ();
}
libeio中的锁
#include <pthread.h>
#define sigset_t int
#define sigfillset(a)
#define pthread_sigmask(a,b,c)
#define sigaddset(a,b)
#define sigemptyset(s)
typedef pthread_mutex_t xmutex_t;
#define X_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
#define X_MUTEX_CREATE(mutex) pthread_mutex_init (&(mutex), 0)
#define X_LOCK(mutex) pthread_mutex_lock (&(mutex))
#define X_UNLOCK(mutex) pthread_mutex_unlock (&(mutex))
typedef pthread_cond_t xcond_t;
#define X_COND_INIT PTHREAD_COND_INITIALIZER
#define X_COND_CREATE(cond) pthread_cond_init (&(cond), 0)
#define X_COND_SIGNAL(cond) pthread_cond_signal (&(cond))
#define X_COND_WAIT(cond,mutex) pthread_cond_wait (&(cond), &(mutex))
#define X_COND_TIMEDWAIT(cond,mutex,to) pthread_cond_timedwait (&(cond), &(mutex), &(to))
在eiosubmit中的部分代码: 1.reqlock加锁进行数据更新和reqqpush操作然后reqlock解锁 2.发送条件锁的信号给worker线程
X_LOCK (reqlock);
++nreqs;
++nready;
reqq_push (&req_queue, req);
X_COND_SIGNAL (reqwait);
X_UNLOCK (reqlock);
etppoll中的部分代码: 1.reqlock加锁进行数据更新和reqqshift操作然后reqlock解锁
X_LOCK (reslock);
req = reqq_shift (&res_queue);
if (req)
{
--npending;
if (!res_queue.size && done_poll_cb)
done_poll_cb ();
}
X_UNLOCK (reslock);
worker线程中使用XCONDWAIT或XCONDTIMEDWAIT来接受条件锁信号
if (idle <= max_idle)
/* we are allowed to idle, so do so without any timeout */
X_COND_WAIT (reqwait, reqlock);
else
{
/* initialise timeout once */
if (!ts.tv_sec)
ts.tv_sec = time (0) + idle_timeout;
if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
}
work线程
线程初始化:
static void
etp_maybe_start_thread (void)
{
if (ecb_expect_true (etp_nthreads () >= wanted))
return;
/* todo: maybe use idle here, but might be less exact */
if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ()))
return;
etp_start_thread ();
}
通过calloc分配work线程资源并挂在wrk_first双向链表上
static etp_worker wrk_first; /* NOT etp */
static void ecb_cold
etp_start_thread (void)
{
etp_worker *wrk = calloc (1, sizeof (etp_worker));
/*TODO*/
assert (("unable to allocate worker thread data", wrk));
X_LOCK (wrklock);
if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
{
wrk->prev = &wrk_first;
wrk->next = wrk_first.next;
wrk_first.next->prev = wrk;
wrk_first.next = wrk;
++started;
}
else
free (wrk);
X_UNLOCK (wrklock);
}
创建属性detached(不用考虑线程资源的回收)的线程
static int
xthread_create (xthread_t *tid, void *(*proc)(void *), void *arg)
{
int retval;
pthread_attr_t attr;
pthread_attr_init (&attr);
pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
retval = pthread_create (tid, &attr, proc, arg) == 0;
pthread_attr_destroy (&attr);
return retval;
}
从链表上移除
static void ecb_cold
etp_worker_free (etp_worker *wrk)
{
free (wrk->tmpbuf.ptr);
wrk->next->prev = wrk->prev;
wrk->prev->next = wrk->next;
free (wrk);
}