Threadpools
Thread pools are set of threads for executing load requests.
It consists of two type of threads:
- Control thread which processes step data and distribute requests across workers
- Worker thread whose are running requests for execution
Variables
tp_worker_min_sleep
tunable: minimum time worker (or control thread) will sleep waiting for request arrival.
Because OS have granular scheduling if we go to sleep, for example for 100us, we may wakeup
only after 178us because nanosleep is not precise (however OS may use high-resolution timer).
So TSLoad may prefer early arrival and ignore sleeping if time interval lesser than tp_worker_min_sleep
ts_time_t tp_worker_min_sleep
tp_worker_overhead
tunable: estimated time consumed by worker between handling of arrival and calling module's
mod_run_request method. Like tp_worker_min_sleep used to make arrivals more precise
ts_time_t tp_worker_overhead
Functions
tp_create
public
Create new thread pool
ARGUMENTS
-
num_threads - Numbber of worker threads in this pool
-
name - Name of thread pool
-
quantum - Worker's quantum
LIBEXPORT thread_pool_t* tp_create(const char* name, unsigned num_threads,ts_time_t quantum, boolean_t discard,struct tp_disp* disp)
tp_destroy
public
LIBEXPORT void tp_destroy(thread_pool_t* tp)
tp_search
public
LIBEXPORT thread_pool_t* tp_search(const char* name)
tp_attach
Insert workload into thread pool's list
void tp_attach(thread_pool_t* tp, struct workload* wl)
tp_detach
Remove workload from thread pool list
void tp_detach(thread_pool_t* tp, struct workload* wl)
tp_compare_requests
Compare two requests. Returns >= 0 if request rq2 is going after rq1
int tp_compare_requests(struct request* rq1, struct request* rq2)
tp_insert_request_impl
Insert request into request queue. Keeps requests sorted by their sched-time.
Doesn't walks entire list, but uses prev_rq and next_rq as hint parameters.
I.e. if we have requests on queue with schedule times 10,20,30,40,50 and 60
and want to add request from another workload with sched time 45, prev request
should point to 60, and we walk back. Then we will get prev_rq = 40 and next_rq
is 50, so to insert request with 55, we shouldn't walk entire list of requests.
ARGUMENTS
-
rq_list - request queue
-
rq - request to be inserted
-
p_prev_rq - hint that contains previous request inside queue
-
p_next_rq - hint that contains next request inside queue
void tp_insert_request_impl(list_head_t* rq_list, list_node_t* rq_node,list_node_t** p_prev_node, list_node_t** p_next_node, ptrdiff_t offset)
tp_insert_request_initnodes
Make preliminary initialization of previous and next node if
list already contains requests. Sets prev and next node to NULL
otherwise.
void tp_insert_request_initnodes(list_head_t* rq_list, list_node_t** p_prev_node, list_node_t** p_next_node)
tp_distribute_requests
Create requests instances according to step data or attach
trace-based requests to threadpool request queue. Automatically sorts
requests by its arrival time.
Distribution across workers is actually done by threadpool dispatcher.
void tp_distribute_requests(struct workload_step* step, thread_pool_t* tp)
tp_fini, tp_init
public
LIBEXPORT int tp_init(void) LIBEXPORT void tp_fini(void)
tp_rele
Some tp_rele's may be called from control/worker thread
In this case we may deadlock because we will join ourselves
Do not destroy tp in this case - leave it to collector/tp_fini
ARGUMENTS
-
may_destroy - - may tp_rele free thread_pool
void tp_rele(thread_pool_t* tp, boolean_t may_destroy)
control_thread
Control thread
Control thread notifies workers after each quantum ending
and processes each step for each workload on thread pool
thread_result_t control_thread(thread_arg_t arg)
worker_thread
Worker thread
thread_result_t worker_thread(thread_arg_t arg)
Types
typedef struct tp_worker
Threadpool worker
MEMBERS
-
w_tp - backward link to threadpool
-
w_thread - associated thread
-
w_rq_mutex - mutex that protects cv and queue of requests
-
w_rq_cv - condition variable for notifying sleeping worker
-
w_rq_head - list of requets attached to this worker
-
w_tpd_data - threadpool dispatcher per-worker data field
typedef struct tp_worker { struct thread_pool* w_tp; thread_t w_thread; thread_mutex_t w_rq_mutex; thread_cv_t w_rq_cv; list_head_t w_rq_head; void* w_tpd_data; } tp_worker_t;
typedef struct thread_pool
Threadpool main descriptor
MEMBERS
-
tp_num_threads - number of worker threads
-
tp_name - threadpool name
-
tp_is_dead - flag that set when threadpool is destroyed
-
tp_started - internal flag that says that tp_create already started threads
-
tp_quantum - control thread's quantum duration (in ns)
-
tp_time - last time control thread had woken up (in ns)
-
tp_ctl_thread - control thread of threadpool
-
tp_workers - array of threadpool workers
-
tp_mutex - mutex that protects list of workloads attached to threadpool
-
tp_ref_count - reference counter for threadpool
-
tp_disp - pointer to dispatcher structure
-
tp_discard - see discard parameter for tsload_create_threadpool
-
tp_rq_head - list of requests that are executing by this threadpool
-
tp_wl_head - list of workloads attached to this threadpool
typedef struct thread_pool { unsigned tp_num_threads; AUTOSTRING char* tp_name; boolean_t tp_is_dead; boolean_t tp_started; ts_time_t tp_quantum; ts_time_t tp_time; thread_t tp_ctl_thread; tp_worker_t* tp_workers; thread_mutex_t tp_mutex; atomic_t tp_ref_count; struct tp_disp* tp_disp; boolean_t tp_discard; list_head_t tp_rq_head; list_head_t tp_wl_head; int tp_wl_count; boolean_t tp_wl_changed; struct thread_pool* tp_next; } thread_pool_t;