Libeio--libeio初始化,REQ/RES队列,锁,线程

libeio数据

libeio使用的数据主要包括多线程(主线程和worker线程),锁(互斥锁和条件锁),队列(reqqueue和resqueue)。

libeio 初始化

int eio_init (void (*want_poll)(void), void (*done_poll)(void))

eioinit函数:初始化libeio库。成功返回0,失败返回-1并且设置合适的errno值。 1. 初始化libeio中的一些全局结构,比如:reqqueue,resqueue,以及各种互斥量等。 2. 保存外界传入的两个回调函数:wantpoll和done_poll。这两个函数都是边缘触发函数

etp_init (void (*want_poll)(void), void (*done_poll)(void))
{

    //初始化三个互斥量wrklock, reslock, reqlock和一个条件变量reqwait

    X_MUTEX_CREATE (wrklock);
    X_MUTEX_CREATE (reslock);
    X_MUTEX_CREATE (reqlock);
    X_COND_CREATE  (reqwait);

    /*
     *初始化两个队列:
     * 1.eio_submit把request放入req_queue
     * 2.eio_poll从res_queue中取出数据
     */

    reqq_init (&req_queue);
    reqq_init (&res_queue);

    //work线程链表

    wrk_first.next =
    wrk_first.prev = &wrk_first;

    /*
     *  number of worker threads currently running.
     */
    started  = 0; 
    /*
     * 表示处于COND_WAIT状态下的空闲线程数
     */
    idle     = 0;
    /*
     * nreqs: number of requests currently handled by libeio. 
     * This is the total number of requests that have been submitted to libeio, but not yet destroyed.
     */
    nreqs    = 0;
    /*
     * nready : number of ready requests, 
     * i.e. requests that have been submitted but have not yet entered the execution phase.
     */
    nready   = 0;
    /*
     * npending : number of pending requests, 
     * i.e. requests that have been executed and have results, 
     * but have not been finished yet by a call to eio_poll)
     */
    npending = 0;

    //设置回调函数

    want_poll_cb = want_poll;
    done_poll_cb = done_poll;

    return 0;
}

REQ队列和RES队列

etb_reqq结构体

typedef struct eio_req    eio_req;
#define ETP_REQ eio_req

/*
* a somewhat faster data structure might be nice, but
* with 8 priorities this actually needs <20 insns
* per shift, the most expensive operation.
*/
typedef struct {
    ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
    int size;
} etp_reqq;

定义两个etp_reqq全局变量

static etp_reqq req_queue;
static etp_reqq res_queue;

初始化队列

static void ecb_noinline ecb_cold
reqq_init (etp_reqq *q)
{
    int pri;

    /*
     * etp_reqq包含ETP_NUM_PRT个指向ETP_REQ的指针数组
     * 每个数组成员的头指针和尾指针指向0 
     */
    for (pri = 0; pri < ETP_NUM_PRI; ++pri)
        q->qs[pri] = q->qe[pri] = 0;

    q->size = 0;
}

把request放入到队列中

static int ecb_noinline
reqq_push (etp_reqq *q, ETP_REQ *req)
{
    int pri = req->pri;
    req->next = 0;

    /*
     * 若qe[pri]已有数据则该更新队列的尾指针指向该request
     * 若无数据则队列头指针和尾指针都指向该request
     */
    if (q->qe[pri])
    {
        q->qe[pri]->next = req;
        q->qe[pri] = req;
    }
    else
        q->qe[pri] = q->qs[pri] = req;

    return q->size++;
}

把request从队列中取出

static ETP_REQ * ecb_noinline
reqq_shift (etp_reqq *q)
{
    int pri;

    if (!q->size)
        return 0;

    --q->size;

    for (pri = ETP_NUM_PRI; pri--; )
    {
        ETP_REQ *req = q->qs[pri];

        if (req)
        {
            /* 队列数据取完,头和尾指针指向0 */
            if (!(q->qs[pri] = (ETP_REQ *)req->next))
                q->qe[pri] = 0;

            return req;
        }
    }

    /* 无数据可取,异常退出 */
    abort ();
}

libeio中的锁

#include <pthread.h>
#define sigset_t int
#define sigfillset(a)
#define pthread_sigmask(a,b,c)
#define sigaddset(a,b)
#define sigemptyset(s)

typedef pthread_mutex_t xmutex_t;
#define X_MUTEX_INIT           PTHREAD_MUTEX_INITIALIZER
#define X_MUTEX_CREATE(mutex)  pthread_mutex_init (&(mutex), 0)
#define X_LOCK(mutex)          pthread_mutex_lock (&(mutex))
#define X_UNLOCK(mutex)        pthread_mutex_unlock (&(mutex))

typedef pthread_cond_t xcond_t;
#define X_COND_INIT                     PTHREAD_COND_INITIALIZER
#define X_COND_CREATE(cond)     pthread_cond_init (&(cond), 0)
#define X_COND_SIGNAL(cond)             pthread_cond_signal (&(cond))
#define X_COND_WAIT(cond,mutex)         pthread_cond_wait (&(cond), &(mutex))
#define X_COND_TIMEDWAIT(cond,mutex,to) pthread_cond_timedwait (&(cond), &(mutex), &(to))

在eiosubmit中的部分代码: 1.reqlock加锁进行数据更新和reqqpush操作然后reqlock解锁 2.发送条件锁的信号给worker线程

X_LOCK (reqlock);
++nreqs;
++nready;
reqq_push (&req_queue, req);
X_COND_SIGNAL (reqwait);
X_UNLOCK (reqlock);

etppoll中的部分代码: 1.reqlock加锁进行数据更新和reqqshift操作然后reqlock解锁

X_LOCK (reslock);
req = reqq_shift (&res_queue);
if (req)
{
    --npending;
    if (!res_queue.size && done_poll_cb)
    done_poll_cb ();
}
X_UNLOCK (reslock);

worker线程中使用XCONDWAIT或XCONDTIMEDWAIT来接受条件锁信号

if (idle <= max_idle)
    /* we are allowed to idle, so do so without any timeout */
    X_COND_WAIT (reqwait, reqlock);
else
{
    /* initialise timeout once */
    if (!ts.tv_sec)
    ts.tv_sec = time (0) + idle_timeout;

    if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
        ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
}

work线程

线程初始化:

static void
etp_maybe_start_thread (void)
{
    if (ecb_expect_true (etp_nthreads () >= wanted))
        return;

    /* todo: maybe use idle here, but might be less exact */
    if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ()))
        return;

    etp_start_thread ();
}

通过calloc分配work线程资源并挂在wrk_first双向链表上

static etp_worker wrk_first; /* NOT etp */

static void ecb_cold
etp_start_thread (void)
{
    etp_worker *wrk = calloc (1, sizeof (etp_worker));

    /*TODO*/
    assert (("unable to allocate worker thread data", wrk));

    X_LOCK (wrklock);

    if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
    {
        wrk->prev = &wrk_first;
        wrk->next = wrk_first.next;
        wrk_first.next->prev = wrk;
        wrk_first.next = wrk;
        ++started;
    }
    else
        free (wrk);

    X_UNLOCK (wrklock);
}

创建属性detached(不用考虑线程资源的回收)的线程

static int
xthread_create (xthread_t *tid, void *(*proc)(void *), void *arg)
{
    int retval;
    pthread_attr_t attr;

    pthread_attr_init (&attr);
    pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);

    retval = pthread_create (tid, &attr, proc, arg) == 0;

    pthread_attr_destroy (&attr);

    return retval;
}

从链表上移除

static void ecb_cold
etp_worker_free (etp_worker *wrk)
{
    free (wrk->tmpbuf.ptr);

    wrk->next->prev = wrk->prev;
    wrk->prev->next = wrk->next;

    free (wrk);
}