xref: /illumos-gate/usr/src/cmd/fm/fmd/common/fmd_eventq.c (revision 581cede61ac9c14d8d4ea452562a567189eead78)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include <fmd_alloc.h>
30 #include <fmd_eventq.h>
31 #include <fmd_module.h>
32 #include <fmd_dispq.h>
33 #include <fmd_subr.h>
34 
35 #include <fmd.h>
36 
37 fmd_eventq_t *
38 fmd_eventq_create(fmd_module_t *mp, fmd_eventqstat_t *stats,
39     pthread_mutex_t *stats_lock, uint_t limit)
40 {
41 	fmd_eventq_t *eq = fmd_zalloc(sizeof (fmd_eventq_t), FMD_SLEEP);
42 
43 	(void) pthread_mutex_init(&eq->eq_lock, NULL);
44 	(void) pthread_cond_init(&eq->eq_cv, NULL);
45 
46 	eq->eq_mod = mp;
47 	eq->eq_stats = stats;
48 	eq->eq_stats_lock = stats_lock;
49 	eq->eq_limit = limit;
50 	eq->eq_sgid = fmd_dispq_getgid(fmd.d_disp, eq);
51 
52 	return (eq);
53 }
54 
55 void
56 fmd_eventq_destroy(fmd_eventq_t *eq)
57 {
58 	fmd_eventqelem_t *eqe;
59 
60 	while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) {
61 		fmd_list_delete(&eq->eq_list, eqe);
62 		fmd_event_rele(eqe->eqe_event);
63 		fmd_free(eqe, sizeof (fmd_eventqelem_t));
64 	}
65 
66 	fmd_dispq_delgid(fmd.d_disp, eq->eq_sgid);
67 	fmd_free(eq, sizeof (fmd_eventq_t));
68 }
69 
70 static void
71 fmd_eventq_drop(fmd_eventq_t *eq, fmd_eventqelem_t *eqe)
72 {
73 	(void) pthread_mutex_lock(eq->eq_stats_lock);
74 	eq->eq_stats->eqs_dropped.fmds_value.ui64++;
75 	(void) pthread_mutex_unlock(eq->eq_stats_lock);
76 
77 	fmd_event_rele(eqe->eqe_event);
78 	fmd_free(eqe, sizeof (fmd_eventqelem_t));
79 }
80 
81 /*
82  * Update statistics when an event is dispatched and placed on a module's event
83  * queue.  This is essentially the same code as kstat_waitq_enter(9F).
84  */
85 static void
86 fmd_eventqstat_dispatch(fmd_eventq_t *eq)
87 {
88 	fmd_eventqstat_t *eqs = eq->eq_stats;
89 	hrtime_t new, delta;
90 	uint32_t wcnt;
91 
92 	(void) pthread_mutex_lock(eq->eq_stats_lock);
93 
94 	new = gethrtime();
95 	delta = new - eqs->eqs_wlastupdate.fmds_value.ui64;
96 	eqs->eqs_wlastupdate.fmds_value.ui64 = new;
97 	wcnt = eqs->eqs_wcnt.fmds_value.ui32++;
98 
99 	if (wcnt != 0) {
100 		eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt;
101 		eqs->eqs_wtime.fmds_value.ui64 += delta;
102 	}
103 
104 	eqs->eqs_dispatched.fmds_value.ui64++;
105 	(void) pthread_mutex_unlock(eq->eq_stats_lock);
106 }
107 
108 void
109 fmd_eventq_insert_at_head(fmd_eventq_t *eq, fmd_event_t *ep)
110 {
111 	uint_t evt = FMD_EVENT_TYPE(ep);
112 	fmd_eventqelem_t *eqe;
113 	int ok;
114 
115 	/*
116 	 * If this event queue is acting as /dev/null, bounce the reference
117 	 * count to free an unreferenced event and just return immediately.
118 	 */
119 	if (eq->eq_limit == 0) {
120 		fmd_event_hold(ep);
121 		fmd_event_rele(ep);
122 		return;
123 	}
124 
125 	eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP);
126 	fmd_event_hold(ep);
127 	eqe->eqe_event = ep;
128 
129 	(void) pthread_mutex_lock(&eq->eq_lock);
130 
131 	if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) {
132 		if (evt != FMD_EVT_CTL)
133 			fmd_eventqstat_dispatch(eq);
134 
135 		fmd_list_prepend(&eq->eq_list, eqe);
136 		eq->eq_size++;
137 	}
138 
139 	(void) pthread_cond_broadcast(&eq->eq_cv);
140 	(void) pthread_mutex_unlock(&eq->eq_lock);
141 
142 	if (!ok)
143 		fmd_eventq_drop(eq, eqe);
144 }
145 
146 void
147 fmd_eventq_insert_at_time(fmd_eventq_t *eq, fmd_event_t *ep)
148 {
149 	uint_t evt = FMD_EVENT_TYPE(ep);
150 	hrtime_t hrt = fmd_event_hrtime(ep);
151 	fmd_eventqelem_t *eqe, *oqe;
152 	int ok;
153 
154 	/*
155 	 * If this event queue is acting as /dev/null, bounce the reference
156 	 * count to free an unreferenced event and just return immediately.
157 	 */
158 	if (eq->eq_limit == 0) {
159 		fmd_event_hold(ep);
160 		fmd_event_rele(ep);
161 		return;
162 	}
163 
164 	eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP);
165 	fmd_event_hold(ep);
166 	eqe->eqe_event = ep;
167 
168 	(void) pthread_mutex_lock(&eq->eq_lock);
169 
170 	/*
171 	 * fmd makes no guarantees that events will be delivered in time order
172 	 * because its transport can make no such guarantees.  Instead we make
173 	 * a looser guarantee that an enqueued event will be dequeued before
174 	 * any newer *pending* events according to event time.  This permits us
175 	 * to state, for example, that a timer expiry event will be delivered
176 	 * prior to any enqueued event whose time is after the timer expired.
177 	 * We use a simple insertion sort for this task, as queue lengths are
178 	 * typically short and events do *tend* to be received chronologically.
179 	 */
180 	for (oqe = fmd_list_prev(&eq->eq_list); oqe; oqe = fmd_list_prev(oqe)) {
181 		if (hrt >= fmd_event_hrtime(oqe->eqe_event))
182 			break; /* 'ep' is newer than the event in 'oqe' */
183 	}
184 
185 	if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) {
186 		if (evt != FMD_EVT_CTL)
187 			fmd_eventqstat_dispatch(eq);
188 
189 		if (oqe == NULL)
190 			fmd_list_prepend(&eq->eq_list, eqe);
191 		else
192 			fmd_list_insert_after(&eq->eq_list, oqe, eqe);
193 		eq->eq_size++;
194 	}
195 
196 	(void) pthread_cond_broadcast(&eq->eq_cv);
197 	(void) pthread_mutex_unlock(&eq->eq_lock);
198 
199 	if (!ok)
200 		fmd_eventq_drop(eq, eqe);
201 }
202 
203 fmd_event_t *
204 fmd_eventq_delete(fmd_eventq_t *eq)
205 {
206 	fmd_eventqstat_t *eqs = eq->eq_stats;
207 	hrtime_t new, delta;
208 	uint32_t wcnt;
209 
210 	fmd_eventqelem_t *eqe;
211 	fmd_event_t *ep;
212 top:
213 	(void) pthread_mutex_lock(&eq->eq_lock);
214 
215 	while (!(eq->eq_flags & FMD_EVENTQ_ABORT) &&
216 	    (eq->eq_size == 0 || (eq->eq_flags & FMD_EVENTQ_SUSPEND)))
217 		(void) pthread_cond_wait(&eq->eq_cv, &eq->eq_lock);
218 
219 	if (eq->eq_flags & FMD_EVENTQ_ABORT) {
220 		(void) pthread_mutex_unlock(&eq->eq_lock);
221 		return (NULL);
222 	}
223 
224 	eqe = fmd_list_next(&eq->eq_list);
225 	fmd_list_delete(&eq->eq_list, eqe);
226 	eq->eq_size--;
227 
228 	(void) pthread_mutex_unlock(&eq->eq_lock);
229 
230 	ep = eqe->eqe_event;
231 	fmd_free(eqe, sizeof (fmd_eventqelem_t));
232 
233 	/*
234 	 * If we dequeued a control event, release it and go back to sleep.
235 	 * fmd_event_rele() on the event will block as described in fmd_ctl.c.
236 	 * This effectively renders control events invisible to our callers
237 	 * as well as to statistics and observability tools (e.g. fmstat(1M)).
238 	 */
239 	if (FMD_EVENT_TYPE(ep) == FMD_EVT_CTL) {
240 		fmd_event_rele(ep);
241 		goto top;
242 	}
243 
244 	/*
245 	 * Before returning, update our statistics.  This code is essentially
246 	 * kstat_waitq_to_runq(9F), except simplified because our queues are
247 	 * always consumed by a single thread (i.e. runq len == 1).
248 	 */
249 	(void) pthread_mutex_lock(eq->eq_stats_lock);
250 
251 	new = gethrtime();
252 	delta = new - eqs->eqs_wlastupdate.fmds_value.ui64;
253 
254 	eqs->eqs_wlastupdate.fmds_value.ui64 = new;
255 	eqs->eqs_dlastupdate.fmds_value.ui64 = new;
256 
257 	ASSERT(eqs->eqs_wcnt.fmds_value.ui32 != 0);
258 	wcnt = eqs->eqs_wcnt.fmds_value.ui32--;
259 
260 	eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt;
261 	eqs->eqs_wtime.fmds_value.ui64 += delta;
262 
263 	if (FMD_EVENT_TYPE(ep) == FMD_EVT_PROTOCOL)
264 		eqs->eqs_prdequeued.fmds_value.ui64++;
265 
266 	eqs->eqs_dequeued.fmds_value.ui64++;
267 	(void) pthread_mutex_unlock(eq->eq_stats_lock);
268 
269 	return (ep);
270 }
271 
272 /*
273  * Update statistics when an event is done being processed by the eventq's
274  * consumer thread.  This is essentially kstat_runq_exit(9F) simplified for
275  * our principle that a single thread consumes the queue (i.e. runq len == 1).
276  */
277 void
278 fmd_eventq_done(fmd_eventq_t *eq)
279 {
280 	fmd_eventqstat_t *eqs = eq->eq_stats;
281 	hrtime_t new, delta;
282 
283 	(void) pthread_mutex_lock(eq->eq_stats_lock);
284 
285 	new = gethrtime();
286 	delta = new - eqs->eqs_dlastupdate.fmds_value.ui64;
287 
288 	eqs->eqs_dlastupdate.fmds_value.ui64 = new;
289 	eqs->eqs_dtime.fmds_value.ui64 += delta;
290 
291 	(void) pthread_mutex_unlock(eq->eq_stats_lock);
292 }
293 
294 void
295 fmd_eventq_cancel(fmd_eventq_t *eq, uint_t type, void *data)
296 {
297 	fmd_eventqelem_t *eqe, *nqe;
298 
299 	(void) pthread_mutex_lock(&eq->eq_lock);
300 
301 	for (eqe = fmd_list_next(&eq->eq_list); eqe != NULL; eqe = nqe) {
302 		nqe = fmd_list_next(eqe);
303 
304 		if (fmd_event_match(eqe->eqe_event, type, data)) {
305 			fmd_list_delete(&eq->eq_list, eqe);
306 			eq->eq_size--;
307 			fmd_event_rele(eqe->eqe_event);
308 			fmd_free(eqe, sizeof (fmd_eventqelem_t));
309 		}
310 	}
311 
312 	(void) pthread_mutex_unlock(&eq->eq_lock);
313 }
314 
315 void
316 fmd_eventq_suspend(fmd_eventq_t *eq)
317 {
318 	(void) pthread_mutex_lock(&eq->eq_lock);
319 	eq->eq_flags |= FMD_EVENTQ_SUSPEND;
320 	(void) pthread_mutex_unlock(&eq->eq_lock);
321 }
322 
323 void
324 fmd_eventq_resume(fmd_eventq_t *eq)
325 {
326 	(void) pthread_mutex_lock(&eq->eq_lock);
327 	eq->eq_flags &= ~FMD_EVENTQ_SUSPEND;
328 	(void) pthread_cond_broadcast(&eq->eq_cv);
329 	(void) pthread_mutex_unlock(&eq->eq_lock);
330 }
331 
332 void
333 fmd_eventq_abort(fmd_eventq_t *eq)
334 {
335 	fmd_eventqelem_t *eqe;
336 
337 	(void) pthread_mutex_lock(&eq->eq_lock);
338 
339 	while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) {
340 		fmd_list_delete(&eq->eq_list, eqe);
341 		fmd_event_rele(eqe->eqe_event);
342 		fmd_free(eqe, sizeof (fmd_eventqelem_t));
343 	}
344 
345 	eq->eq_flags |= FMD_EVENTQ_ABORT;
346 	(void) pthread_cond_broadcast(&eq->eq_cv);
347 	(void) pthread_mutex_unlock(&eq->eq_lock);
348 }
349