Threadpools

Thread pools are set of threads for executing load requests.

It consists of two type of threads:
- Control thread which processes step data and distribute requests across workers
- Worker thread whose are running requests for execution

Variables

tp_worker_min_sleep


tunable: minimum time worker (or control thread) will sleep waiting for request arrival.
Because OS have granular scheduling if we go to sleep, for example for 100us, we may wakeup
only after 178us because nanosleep is not precise (however OS may use high-resolution timer).

So TSLoad may prefer early arrival and ignore sleeping if time interval lesser than tp_worker_min_sleep

ts_time_t tp_worker_min_sleep 

tp_worker_overhead


tunable: estimated time consumed by worker between handling of arrival and calling module's
mod_run_request method. Like tp_worker_min_sleep used to make arrivals more precise

ts_time_t tp_worker_overhead 

Functions

tp_create

public


Create new thread pool

ARGUMENTS

LIBEXPORT thread_pool_t* tp_create(const char* name, unsigned num_threads,ts_time_t quantum, boolean_t discard,struct tp_disp* disp)

tp_destroy

public

LIBEXPORT void tp_destroy(thread_pool_t* tp)

tp_search

public

LIBEXPORT thread_pool_t* tp_search(const char* name)

tp_attach


Insert workload into thread pool's list

void tp_attach(thread_pool_t* tp, struct workload* wl)

tp_detach


Remove workload from thread pool list

void tp_detach(thread_pool_t* tp, struct workload* wl)

tp_compare_requests


Compare two requests. Returns >= 0 if request rq2 is going after rq1

int tp_compare_requests(struct request* rq1, struct request* rq2)

tp_insert_request_impl


Insert request into request queue. Keeps requests sorted by their sched-time.
Doesn't walks entire list, but uses prev_rq and next_rq as hint parameters.

I.e. if we have requests on queue with schedule times 10,20,30,40,50 and 60
and want to add request from another workload with sched time 45, prev request
should point to 60, and we walk back. Then we will get prev_rq = 40 and next_rq
is 50, so to insert request with 55, we shouldn't walk entire list of requests.

ARGUMENTS

void tp_insert_request_impl(list_head_t* rq_list, list_node_t* rq_node,list_node_t** p_prev_node, list_node_t** p_next_node, ptrdiff_t offset)

tp_insert_request_initnodes


Make preliminary initialization of previous and next node if
list already contains requests. Sets prev and next node to NULL
otherwise.

void tp_insert_request_initnodes(list_head_t* rq_list, list_node_t** p_prev_node, list_node_t** p_next_node)

tp_distribute_requests


Create requests instances according to step data or attach
trace-based requests to threadpool request queue. Automatically sorts
requests by its arrival time.

Distribution across workers is actually done by threadpool dispatcher.

void tp_distribute_requests(struct workload_step* step, thread_pool_t* tp)

tp_fini, tp_init

public

LIBEXPORT int tp_init(void)
LIBEXPORT void tp_fini(void)

tp_rele

Some tp_rele's may be called from control/worker thread
In this case we may deadlock because we will join ourselves
Do not destroy tp in this case - leave it to collector/tp_fini

ARGUMENTS

void tp_rele(thread_pool_t* tp, boolean_t may_destroy)

control_thread


Control thread

Control thread notifies workers after each quantum ending
and processes each step for each workload on thread pool

thread_result_t control_thread(thread_arg_t arg) 

worker_thread


Worker thread

thread_result_t worker_thread(thread_arg_t arg) 

Types

typedef struct tp_worker


Threadpool worker

MEMBERS

typedef struct tp_worker {
    struct thread_pool* w_tp;
    thread_t w_thread;

    thread_mutex_t w_rq_mutex;
    thread_cv_t w_rq_cv;
    list_head_t w_rq_head;

    void* w_tpd_data;
} tp_worker_t;

typedef struct thread_pool


Threadpool main descriptor

MEMBERS

typedef struct thread_pool {
    unsigned tp_num_threads;
    AUTOSTRING char* tp_name;

    boolean_t tp_is_dead;
    boolean_t tp_started;

    ts_time_t tp_quantum;
    ts_time_t tp_time;

    thread_t  tp_ctl_thread;
    tp_worker_t* tp_workers;

    thread_mutex_t tp_mutex;
    atomic_t       tp_ref_count;

    struct tp_disp* tp_disp;
    boolean_t tp_discard;

    list_head_t       tp_rq_head;

    list_head_t       tp_wl_head;
    int tp_wl_count;
    boolean_t tp_wl_changed;

    struct thread_pool* tp_next;
} thread_pool_t;