xref: /illumos-gate/usr/src/lib/libslp/clib/slp_queue.c (revision 581cede61ac9c14d8d4ea452562a567189eead78)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 /*
30  * A synchronized FIFO queue for inter-thread producer-consumer semantics.
31  * This queue will handle multiple writers and readers simultaneously.
32  *
33  * The following operations are provided:
34  * slp_new_queue:	create a new queue
35  * slp_enqueue:		place a message at the end of the queue
36  * slp_enqueue_at_head:	place a message the the start of the queue
37  * slp_dequeue:		remove and return the next message on the queue
38  *				(waits indefinately)
39  * slp_dequeue_timed:	remove and return the next message on the queue
40  *				(waits only for a specified time)
41  * slp_flush_queue:	flushes and frees all messages on a queue
42  * slp_destroy_queue:	frees an empty queue.
43  */
44 
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <thread.h>
48 #include <synch.h>
49 #include <syslog.h>
50 #include <slp.h>
51 #include <slp-internal.h>
52 
53 /* Private implementation details */
54 struct queue_entry {
55 	void *msg;
56 	struct queue_entry *next;
57 };
58 typedef struct queue_entry slp_queue_entry_t;
59 
60 struct queue {
61 	slp_queue_entry_t *head;
62 	slp_queue_entry_t *tail;
63 	mutex_t *lock;
64 	cond_t *wait;
65 	int count;
66 };
67 
68 /*
69  * Creates, initializes, and returns a new queue.
70  * If an initialization error occured, returns NULL and sets err to
71  * the appropriate SLP error code.
72  * queues can operate in one of two modes: timed-wait, and infinite
73  * wait. The timeout parameter specifies which of these modes should
74  * be enabled for the new queue.
75  */
76 slp_queue_t *slp_new_queue(SLPError *err) {
77 	mutex_t *lock;
78 	cond_t *wait;
79 	struct queue *q;
80 
81 	*err = SLP_OK;
82 
83 	/* initialize new mutex and semaphore */
84 	if ((lock = calloc(1, sizeof (*lock))) == NULL) {
85 		*err = SLP_MEMORY_ALLOC_FAILED;
86 		slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
87 		return (NULL);
88 	}
89 
90 	/* intialize condition vars */
91 	if (!(wait = calloc(1, sizeof (*wait)))) {
92 		*err = SLP_MEMORY_ALLOC_FAILED;
93 		slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
94 		return (NULL);
95 	}
96 	(void) cond_init(wait, NULL, NULL);
97 
98 	/* create the queue */
99 	if ((q = malloc(sizeof (*q))) == NULL) {
100 		*err = SLP_MEMORY_ALLOC_FAILED;
101 		slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
102 		return (NULL);
103 	}
104 
105 	q->head = NULL;
106 	q->lock = lock;
107 	q->wait = wait;
108 	q->count = 0;
109 
110 	return (q);
111 }
112 
113 /*
114  * Adds msg to the tail of queue q.
115  * Returns an SLP error code: SLP_OK for no error, or SLP_MEMORY_ALLOC_FAILED
116  * if it couldn't allocate memory.
117  */
118 SLPError slp_enqueue(slp_queue_t *qa, void *msg) {
119 	slp_queue_entry_t *qe;
120 	struct queue *q = qa;
121 
122 	if ((qe = malloc(sizeof (*qe))) == NULL) {
123 		slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory");
124 		return (SLP_MEMORY_ALLOC_FAILED);
125 	}
126 
127 	(void) mutex_lock(q->lock);
128 	qe->msg = msg;
129 	qe->next = NULL;
130 	if (q->head != NULL) {	/* queue is not emptry */
131 		q->tail->next = qe;
132 		q->tail = qe;
133 	} else {		/* queue is empty */
134 		q->head = q->tail = qe;
135 	}
136 	q->count++;
137 	(void) mutex_unlock(q->lock);
138 	(void) cond_signal(q->wait);
139 
140 	return (SLP_OK);
141 }
142 
143 /*
144  * Inserts a message at the head of the queue. This is useful for inserting
145  * things like cancel messages.
146  */
147 SLPError slp_enqueue_at_head(slp_queue_t *qa, void *msg) {
148 	slp_queue_entry_t *qe;
149 	struct queue *q = qa;
150 
151 	if ((qe = malloc(sizeof (*qe))) == NULL) {
152 		slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory");
153 		return (SLP_MEMORY_ALLOC_FAILED);
154 	}
155 
156 	(void) mutex_lock(q->lock);
157 	qe->msg = msg;
158 	qe->next = q->head;
159 	q->head = qe;
160 
161 	q->count++;
162 	(void) mutex_unlock(q->lock);
163 	(void) cond_signal(q->wait);
164 
165 	return (SLP_OK);
166 }
167 
168 /*
169  * The core functionality for dequeue.
170  */
171 static void *dequeue_nolock(struct queue *q) {
172 	void *msg;
173 	slp_queue_entry_t *qe = q->head;
174 
175 	if (!qe)
176 		return (NULL);	/* shouldn't get here */
177 	msg = qe->msg;
178 	if (!qe->next)		/* last one in queue */
179 		q->head = q->tail = NULL;
180 	else
181 		q->head = qe->next;
182 	free(qe);
183 	q->count--;
184 	return (msg);
185 }
186 
187 /*
188  * Returns the first message waiting or arriving in the queue, or if no
189  * message is available after waiting the amount of time specified in
190  * 'to', returns NULL, and sets 'etimed' to true. If an error occured,
191  * returns NULL and sets 'etimed' to false.
192  */
193 void *slp_dequeue_timed(slp_queue_t *qa, timestruc_t *to, SLPBoolean *etimed) {
194 	int err;
195 	void *ans;
196 	struct queue *q = qa;
197 
198 	if (etimed)
199 		*etimed = SLP_FALSE;
200 
201 	(void) mutex_lock(q->lock);
202 	if (q->count > 0) {
203 		/* something's in the q, so no need to wait */
204 		goto msg_available;
205 	}
206 
207 	/* else wait */
208 	while (q->count == 0) {
209 		if (to) {
210 			err = cond_timedwait(q->wait, q->lock, to);
211 		} else {
212 			err = cond_wait(q->wait, q->lock);
213 		}
214 		if (err == ETIME) {
215 			(void) mutex_unlock(q->lock);
216 			*etimed = SLP_TRUE;
217 			return (NULL);
218 		}
219 	}
220 
221 msg_available:
222 	ans = dequeue_nolock(q);
223 	(void) mutex_unlock(q->lock);
224 	return (ans);
225 }
226 
227 /*
228  * Removes the first message from the queue and returns it.
229  * Returns NULL only on internal error.
230  */
231 void *slp_dequeue(slp_queue_t *qa) {
232 	return (slp_dequeue_timed(qa, NULL, NULL));
233 }
234 
235 /*
236  * Flushes the queue, using the caller-specified free function to
237  * free each message in the queue.
238  */
239 void slp_flush_queue(slp_queue_t *qa, void (*free_f)(void *)) {
240 	slp_queue_entry_t *p, *pn;
241 	struct queue *q = qa;
242 
243 	for (p = q->head; p; p = pn) {
244 		pn = p->next;
245 		free_f(p);
246 	}
247 }
248 
249 /*
250  * Frees a queue.
251  * The queue must be empty before it can be destroyed; slp_flush_queue
252  * can be used to empty a queue.
253  */
254 void slp_destroy_queue(slp_queue_t *qa) {
255 	struct queue *q = qa;
256 
257 	(void) mutex_destroy(q->lock);
258 	(void) cond_destroy(q->wait);
259 	free(q->lock);
260 	free(q->wait);
261 	free(q);
262 }
263