ThreadPool Dispatchers
ThreadPool dispatcher is set of hooks that simulate request arrivals,
distributes requests between workers and reports finished requests.
Hooks (see tp_disp_class
):
-
preinit - actually called by json_tp_disp_proc factory to set params
-
init/destroy - initialize/destroy private tpd data
-
proc_tsobj - process TSObject to set tpd parameter. May be set to NULL.
-
control_report - called when control thread wants to report data
When discard policy is set, should clear worker queues and report
all requests to reporter thread. If not, should split request list into
two and report only finished requests.
-
control_sleep - called when control thread finished generating requests
May simulate request arrivals
-
worker_pick - called when worker wants to pick next request from queue
-
worker_done - called when worker finished executing request
-
worker_finish - special hook for tp_destroy() code. If dispatcher uses
external cv's, should wakeup worker because threadpool is dying.
-
relink_request is called when request's rq_sched_time changes and it should
be again linked to maintain queue sorted.
Benchmark dispatcher
Based on first-free dispatcher, but much simpler. Dispatching is handled by worker threads.
tpd_bench_t contains cyclic list of requests bench_rq_list (copy of tp_rq_head list), and
each worker picks next entry from that list, clones it and runs request. On step end,
tpd_worker_pick_bench swaps lists.
First-Free thread pool dispatcher
Dispatches request to first worker that finishes it's request or
to random worker that sleeping.
Queue-based dispatcher classes
Starting each step, pre-distributes requests among workers then sleeps
Has four options:
- Round-robin
- Random
- Fill up to N requests
- Per-user
Constants
TPD_ERROR, TPD_OK, TPD_BAD
ThreadPool dispatcher error code
#define TPD_OK 0 #define TPD_ERROR -1 #define TPD_BAD -2
Functions
tpd_wqueue_pick
Wait until somebody put request onto worker's queue than return
this request. If threadpool dies, returns NULL.
request_t* tpd_wqueue_pick(thread_pool_t* tp, tp_worker_t* worker)
tpd_wqueue_put
Put single request onto queue of worker and wake up worker
void tpd_wqueue_put(thread_pool_t* tp, tp_worker_t* worker, request_t* rq)
tpd_next_wid_rr
public
static int tpd_next_wid_rr(thread_pool_t* tp, int wid, request_t* rq)
tpd_first_wid_rand
public
STATIC_INLINE int tpd_first_wid_rand(thread_pool_t* tp)
tpd_next_wid_rand
public
static int tpd_next_wid_rand(thread_pool_t* tp, int wid, request_t* rq)
tpd_destroy
public
LIBEXPORT void tpd_destroy(tp_disp_t* tpd)
tpdisp_unregister, tpdisp_register
public
LIBEXPORT int tpdisp_register(module_t* mod, tp_disp_class_t* class) LIBEXPORT int tpdisp_unregister(module_t* mod, tp_disp_class_t* class)
tpdisp_fini, tpdisp_init
public
LIBEXPORT int tpdisp_init(void) LIBEXPORT void tpdisp_fini(void)
Types
typedef struct tp_disp_class
typedef struct tp_disp_class { AUTOSTRING char* name; const char* description; struct tsload_param* params; int (*init)(thread_pool_t* tp); void (*destroy)(thread_pool_t* tp); int (*proc_tsobj)(struct tp_disp* tpd, tsobj_node_t* node); void (*control_report)(thread_pool_t* tp); void (*control_sleep)(thread_pool_t* tp); request_t* (*worker_pick)(thread_pool_t* tp, tp_worker_t* worker); void (*worker_done)(thread_pool_t* tp, tp_worker_t* worker, request_t* rq); void (*worker_signal)(thread_pool_t* tp, int wid); void (*relink_request)(thread_pool_t* tp, request_t* rq); struct tp_disp_class* next; module_t* mod; } tp_disp_class_t;
typedef struct tp_disp
typedef struct tp_disp { thread_pool_t* tpd_tp; tp_disp_class_t* tpd_class; void* tpd_data; } tp_disp_t;