ThreadPool Dispatchers

ThreadPool dispatcher is set of hooks that simulate request arrivals,
distributes requests between workers and reports finished requests.

Hooks (see tp_disp_class):

Benchmark dispatcher

Based on first-free dispatcher, but much simpler. Dispatching is handled by worker threads.

tpd_bench_t contains cyclic list of requests bench_rq_list (copy of tp_rq_head list), and
each worker picks next entry from that list, clones it and runs request. On step end,
tpd_worker_pick_bench swaps lists.

First-Free thread pool dispatcher

Dispatches request to first worker that finishes it's request or
to random worker that sleeping.

Queue-based dispatcher classes

Starting each step, pre-distributes requests among workers then sleeps
Has four options:
- Round-robin
- Random
- Fill up to N requests
- Per-user

Constants

TPD_ERROR, TPD_OK, TPD_BAD


ThreadPool dispatcher error code

#define TPD_OK  0
#define TPD_ERROR   -1
#define TPD_BAD -2

Functions

tpd_wqueue_pick


Wait until somebody put request onto worker's queue than return
this request. If threadpool dies, returns NULL.

request_t* tpd_wqueue_pick(thread_pool_t* tp, tp_worker_t* worker)

tpd_wqueue_put


Put single request onto queue of worker and wake up worker

void tpd_wqueue_put(thread_pool_t* tp, tp_worker_t* worker, request_t* rq)

tpd_next_wid_rr

public

static int tpd_next_wid_rr(thread_pool_t* tp, int wid, request_t* rq) 

tpd_first_wid_rand

public

STATIC_INLINE int tpd_first_wid_rand(thread_pool_t* tp) 

tpd_next_wid_rand

public

static int tpd_next_wid_rand(thread_pool_t* tp, int wid, request_t* rq) 

tpd_destroy

public

LIBEXPORT void tpd_destroy(tp_disp_t* tpd)

tpdisp_unregister, tpdisp_register

public

LIBEXPORT int tpdisp_register(module_t* mod, tp_disp_class_t* class)
LIBEXPORT int tpdisp_unregister(module_t* mod, tp_disp_class_t* class)

tpdisp_fini, tpdisp_init

public

LIBEXPORT int tpdisp_init(void)
LIBEXPORT void tpdisp_fini(void)

Types

typedef struct tp_disp_class

typedef struct tp_disp_class {
    AUTOSTRING char* name;
    const char* description;
    struct tsload_param* params;

    int (*init)(thread_pool_t* tp);
    void (*destroy)(thread_pool_t* tp);
    int (*proc_tsobj)(struct tp_disp* tpd, tsobj_node_t* node);

    void (*control_report)(thread_pool_t* tp);
    void (*control_sleep)(thread_pool_t* tp);

    request_t* (*worker_pick)(thread_pool_t* tp, tp_worker_t* worker);
    void (*worker_done)(thread_pool_t* tp, tp_worker_t* worker, request_t* rq);
    void (*worker_signal)(thread_pool_t* tp, int wid);

    void (*relink_request)(thread_pool_t* tp, request_t* rq);
    
    struct tp_disp_class* next;
    module_t* mod;
} tp_disp_class_t;

typedef struct tp_disp

typedef struct tp_disp {
    thread_pool_t*    tpd_tp;
    tp_disp_class_t* tpd_class;
    void* tpd_data;
} tp_disp_t;