1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
|
/*
* ntp_worker.h
*/
#ifndef NTP_WORKER_H
#define NTP_WORKER_H
#include "ntp_workimpl.h"
#ifdef WORKER
# if defined(WORK_THREAD) && defined(WORK_PIPE)
# ifdef HAVE_SEMAPHORE_H
# include <semaphore.h>
# endif
# endif
#include "ntp_stdlib.h"
/* #define TEST_BLOCKING_WORKER */ /* ntp_config.c ntp_intres.c */
typedef enum blocking_work_req_tag {
BLOCKING_GETNAMEINFO,
BLOCKING_GETADDRINFO,
} blocking_work_req;
typedef void (*blocking_work_callback)(blocking_work_req, void *, size_t, void *);
typedef enum blocking_magic_sig_e {
BLOCKING_REQ_MAGIC = 0x510c7ecf,
BLOCKING_RESP_MAGIC = 0x510c7e54,
} blocking_magic_sig;
/*
* The same header is used for both requests to and responses from
* the child. In the child, done_func and context are opaque.
*/
typedef struct blocking_pipe_header_tag {
size_t octets;
blocking_magic_sig magic_sig;
blocking_work_req rtype;
u_int child_idx;
blocking_work_callback done_func;
void * context;
} blocking_pipe_header;
# ifdef WORK_THREAD
# ifdef SYS_WINNT
typedef struct { HANDLE thnd; } thread_type;
typedef struct { HANDLE shnd; } sema_type;
# else
typedef pthread_t thread_type;
typedef sem_t sema_type;
# endif
typedef thread_type *thr_ref;
typedef sema_type *sem_ref;
# endif
/*
*
*/
#if defined(WORK_FORK)
typedef struct blocking_child_tag {
int reusable;
int pid;
int req_write_pipe; /* parent */
int resp_read_pipe;
void * resp_read_ctx;
int req_read_pipe; /* child */
int resp_write_pipe;
int ispipe;
volatile u_int resp_ready_seen; /* signal/scan */
volatile u_int resp_ready_done; /* consumer/mainloop */
} blocking_child;
#elif defined(WORK_THREAD)
typedef struct blocking_child_tag {
/*
* blocking workitems and blocking_responses are
* dynamically-sized one-dimensional arrays of pointers to
* blocking worker requests and responses.
*
* IMPORTANT: This structure is shared between threads, and all
* access that is not atomic (especially queue operations) must
* hold the 'accesslock' semaphore to avoid data races.
*
* The resource management (thread/semaphore
* creation/destruction) functions and functions just testing a
* handle are safe because these are only changed by the main
* thread when no worker is running on the same data structure.
*/
int reusable;
sem_ref accesslock; /* shared access lock */
thr_ref thread_ref; /* thread 'handle' */
/* the reuest queue */
blocking_pipe_header ** volatile
workitems;
volatile size_t workitems_alloc;
size_t head_workitem; /* parent */
size_t tail_workitem; /* child */
sem_ref workitems_pending; /* signalling */
/* the response queue */
blocking_pipe_header ** volatile
responses;
volatile size_t responses_alloc;
size_t head_response; /* child */
size_t tail_response; /* parent */
/* event handles / sem_t pointers */
sem_ref wake_scheduled_sleep;
/* some systems use a pipe for notification, others a semaphore.
* Both employ the queue above for the actual data transfer.
*/
#ifdef WORK_PIPE
int resp_read_pipe; /* parent */
int resp_write_pipe; /* child */
int ispipe;
void * resp_read_ctx; /* child */
volatile u_int resp_ready_seen; /* signal/scan */
volatile u_int resp_ready_done; /* consumer/mainloop */
#else
sem_ref responses_pending; /* signalling */
#endif
sema_type sem_table[4];
thread_type thr_table[1];
} blocking_child;
#endif /* WORK_THREAD */
/* we need some global tag to indicate any blocking child may be ready: */
extern volatile u_int blocking_child_ready_seen;/* signal/scan */
extern volatile u_int blocking_child_ready_done;/* consumer/mainloop */
extern blocking_child ** blocking_children;
extern size_t blocking_children_alloc;
extern int worker_per_query; /* boolean */
extern int intres_req_pending;
extern u_int available_blocking_child_slot(void);
extern int queue_blocking_request(blocking_work_req, void *,
size_t, blocking_work_callback,
void *);
extern int queue_blocking_response(blocking_child *,
blocking_pipe_header *, size_t,
const blocking_pipe_header *);
extern void process_blocking_resp(blocking_child *);
extern void harvest_blocking_responses(void);
extern int send_blocking_req_internal(blocking_child *,
blocking_pipe_header *,
void *);
extern int send_blocking_resp_internal(blocking_child *,
blocking_pipe_header *);
extern blocking_pipe_header *
receive_blocking_req_internal(blocking_child *);
extern blocking_pipe_header *
receive_blocking_resp_internal(blocking_child *);
extern int blocking_child_common(blocking_child *);
extern void exit_worker(int)
__attribute__ ((__noreturn__));
extern int worker_sleep(blocking_child *, time_t);
extern void worker_idle_timer_fired(void);
extern void interrupt_worker_sleep(void);
extern int req_child_exit(blocking_child *);
#ifndef HAVE_IO_COMPLETION_PORT
extern int pipe_socketpair(int fds[2], int *is_pipe);
extern void close_all_beyond(int);
extern void close_all_except(int);
extern void kill_asyncio (int);
#endif
# ifdef WORK_PIPE
typedef void (*addremove_io_fd_func)(int, int, int);
extern addremove_io_fd_func addremove_io_fd;
# else
extern void handle_blocking_resp_sem(void *);
typedef void (*addremove_io_semaphore_func)(sem_ref, int);
extern addremove_io_semaphore_func addremove_io_semaphore;
# endif
# ifdef WORK_FORK
extern int worker_process;
# endif
#endif /* WORKER */
#if defined(HAVE_DROPROOT) && defined(WORK_FORK)
extern void fork_deferred_worker(void);
#else
# define fork_deferred_worker() do {} while (0)
#endif
#endif /* !NTP_WORKER_H */
|