xref: /illumos-gate/usr/src/uts/common/io/stream.c (revision 581cede61ac9c14d8d4ea452562a567189eead78)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	  All Rights Reserved  	*/
23 
24 /*
25  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
26  * Use is subject to license terms.
27  */
28 
29 #include <sys/types.h>
30 #include <sys/param.h>
31 #include <sys/thread.h>
32 #include <sys/sysmacros.h>
33 #include <sys/stropts.h>
34 #include <sys/stream.h>
35 #include <sys/strsubr.h>
36 #include <sys/strsun.h>
37 #include <sys/conf.h>
38 #include <sys/debug.h>
39 #include <sys/cmn_err.h>
40 #include <sys/kmem.h>
41 #include <sys/atomic.h>
42 #include <sys/errno.h>
43 #include <sys/vtrace.h>
44 #include <sys/ftrace.h>
45 #include <sys/ontrap.h>
46 #include <sys/multidata.h>
47 #include <sys/multidata_impl.h>
48 #include <sys/sdt.h>
49 #include <sys/strft.h>
50 
51 #ifdef DEBUG
52 #include <sys/kmem_impl.h>
53 #endif
54 
55 /*
56  * This file contains all the STREAMS utility routines that may
57  * be used by modules and drivers.
58  */
59 
60 /*
61  * STREAMS message allocator: principles of operation
62  *
63  * The streams message allocator consists of all the routines that
64  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
65  * dupb(), freeb() and freemsg().  What follows is a high-level view
66  * of how the allocator works.
67  *
68  * Every streams message consists of one or more mblks, a dblk, and data.
69  * All mblks for all types of messages come from a common mblk_cache.
70  * The dblk and data come in several flavors, depending on how the
71  * message is allocated:
72  *
73  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
74  *     fixed-size dblk/data caches. For message sizes that are multiples of
75  *     PAGESIZE, dblks are allocated separately from the buffer.
76  *     The associated buffer is allocated by the constructor using kmem_alloc().
77  *     For all other message sizes, dblk and its associated data is allocated
78  *     as a single contiguous chunk of memory.
79  *     Objects in these caches consist of a dblk plus its associated data.
80  *     allocb() determines the nearest-size cache by table lookup:
81  *     the dblk_cache[] array provides the mapping from size to dblk cache.
82  *
83  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
84  *     kmem_alloc()'ing a buffer for the data and supplying that
85  *     buffer to gesballoc(), described below.
86  *
87  * (3) The four flavors of [d]esballoc[a] are all implemented by a
88  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
89  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
90  *     db_lim and db_frtnp to describe the caller-supplied buffer.
91  *
92  * While there are several routines to allocate messages, there is only
93  * one routine to free messages: freeb().  freeb() simply invokes the
94  * dblk's free method, dbp->db_free(), which is set at allocation time.
95  *
96  * dupb() creates a new reference to a message by allocating a new mblk,
97  * incrementing the dblk reference count and setting the dblk's free
98  * method to dblk_decref().  The dblk's original free method is retained
99  * in db_lastfree.  dblk_decref() decrements the reference count on each
100  * freeb().  If this is not the last reference it just frees the mblk;
101  * if this *is* the last reference, it restores db_free to db_lastfree,
102  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
103  *
104  * The implementation makes aggressive use of kmem object caching for
105  * maximum performance.  This makes the code simple and compact, but
106  * also a bit abstruse in some places.  The invariants that constitute a
107  * message's constructed state, described below, are more subtle than usual.
108  *
109  * Every dblk has an "attached mblk" as part of its constructed state.
110  * The mblk is allocated by the dblk's constructor and remains attached
111  * until the message is either dup'ed or pulled up.  In the dupb() case
112  * the mblk association doesn't matter until the last free, at which time
113  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
114  * the mblk association because it swaps the leading mblks of two messages,
115  * so it is responsible for swapping their db_mblk pointers accordingly.
116  * From a constructed-state viewpoint it doesn't matter that a dblk's
117  * attached mblk can change while the message is allocated; all that
118  * matters is that the dblk has *some* attached mblk when it's freed.
119  *
120  * The sizes of the allocb() small-message caches are not magical.
121  * They represent a good trade-off between internal and external
122  * fragmentation for current workloads.  They should be reevaluated
123  * periodically, especially if allocations larger than DBLK_MAX_CACHE
124  * become common.  We use 64-byte alignment so that dblks don't
125  * straddle cache lines unnecessarily.
126  */
127 #define	DBLK_MAX_CACHE		73728
128 #define	DBLK_CACHE_ALIGN	64
129 #define	DBLK_MIN_SIZE		8
130 #define	DBLK_SIZE_SHIFT		3
131 
132 #ifdef _BIG_ENDIAN
133 #define	DBLK_RTFU_SHIFT(field)	\
134 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
135 #else
136 #define	DBLK_RTFU_SHIFT(field)	\
137 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
138 #endif
139 
140 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
141 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
142 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
143 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
144 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
145 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
146 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
147 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
148 
149 static size_t dblk_sizes[] = {
150 #ifdef _LP64
151 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
152 	8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
153 	40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
154 #else
155 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
156 	8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
157 	40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
158 #endif
159 	DBLK_MAX_CACHE, 0
160 };
161 
162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
163 static struct kmem_cache *mblk_cache;
164 static struct kmem_cache *dblk_esb_cache;
165 static struct kmem_cache *fthdr_cache;
166 static struct kmem_cache *ftblk_cache;
167 
168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
169 static mblk_t *allocb_oversize(size_t size, int flags);
170 static int allocb_tryhard_fails;
171 static void frnop_func(void *arg);
172 frtn_t frnop = { frnop_func };
173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
174 
175 static boolean_t rwnext_enter(queue_t *qp);
176 static void rwnext_exit(queue_t *qp);
177 
178 /*
179  * Patchable mblk/dblk kmem_cache flags.
180  */
181 int dblk_kmem_flags = 0;
182 int mblk_kmem_flags = 0;
183 
184 static int
185 dblk_constructor(void *buf, void *cdrarg, int kmflags)
186 {
187 	dblk_t *dbp = buf;
188 	ssize_t msg_size = (ssize_t)cdrarg;
189 	size_t index;
190 
191 	ASSERT(msg_size != 0);
192 
193 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
194 
195 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
196 
197 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
198 		return (-1);
199 	if ((msg_size & PAGEOFFSET) == 0) {
200 		dbp->db_base = kmem_alloc(msg_size, kmflags);
201 		if (dbp->db_base == NULL) {
202 			kmem_cache_free(mblk_cache, dbp->db_mblk);
203 			return (-1);
204 		}
205 	} else {
206 		dbp->db_base = (unsigned char *)&dbp[1];
207 	}
208 
209 	dbp->db_mblk->b_datap = dbp;
210 	dbp->db_cache = dblk_cache[index];
211 	dbp->db_lim = dbp->db_base + msg_size;
212 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
213 	dbp->db_frtnp = NULL;
214 	dbp->db_fthdr = NULL;
215 	dbp->db_credp = NULL;
216 	dbp->db_cpid = -1;
217 	dbp->db_struioflag = 0;
218 	dbp->db_struioun.cksum.flags = 0;
219 	return (0);
220 }
221 
222 /*ARGSUSED*/
223 static int
224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
225 {
226 	dblk_t *dbp = buf;
227 
228 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
229 		return (-1);
230 	dbp->db_mblk->b_datap = dbp;
231 	dbp->db_cache = dblk_esb_cache;
232 	dbp->db_fthdr = NULL;
233 	dbp->db_credp = NULL;
234 	dbp->db_cpid = -1;
235 	dbp->db_struioflag = 0;
236 	dbp->db_struioun.cksum.flags = 0;
237 	return (0);
238 }
239 
240 static int
241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
242 {
243 	dblk_t *dbp = buf;
244 	bcache_t *bcp = cdrarg;
245 
246 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
247 		return (-1);
248 
249 	dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
250 	if (dbp->db_base == NULL) {
251 		kmem_cache_free(mblk_cache, dbp->db_mblk);
252 		return (-1);
253 	}
254 
255 	dbp->db_mblk->b_datap = dbp;
256 	dbp->db_cache = (void *)bcp;
257 	dbp->db_lim = dbp->db_base + bcp->size;
258 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
259 	dbp->db_frtnp = NULL;
260 	dbp->db_fthdr = NULL;
261 	dbp->db_credp = NULL;
262 	dbp->db_cpid = -1;
263 	dbp->db_struioflag = 0;
264 	dbp->db_struioun.cksum.flags = 0;
265 	return (0);
266 }
267 
268 /*ARGSUSED*/
269 static void
270 dblk_destructor(void *buf, void *cdrarg)
271 {
272 	dblk_t *dbp = buf;
273 	ssize_t msg_size = (ssize_t)cdrarg;
274 
275 	ASSERT(dbp->db_mblk->b_datap == dbp);
276 	ASSERT(msg_size != 0);
277 	ASSERT(dbp->db_struioflag == 0);
278 	ASSERT(dbp->db_struioun.cksum.flags == 0);
279 
280 	if ((msg_size & PAGEOFFSET) == 0) {
281 		kmem_free(dbp->db_base, msg_size);
282 	}
283 
284 	kmem_cache_free(mblk_cache, dbp->db_mblk);
285 }
286 
287 static void
288 bcache_dblk_destructor(void *buf, void *cdrarg)
289 {
290 	dblk_t *dbp = buf;
291 	bcache_t *bcp = cdrarg;
292 
293 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
294 
295 	ASSERT(dbp->db_mblk->b_datap == dbp);
296 	ASSERT(dbp->db_struioflag == 0);
297 	ASSERT(dbp->db_struioun.cksum.flags == 0);
298 
299 	kmem_cache_free(mblk_cache, dbp->db_mblk);
300 }
301 
302 /* ARGSUSED */
303 static int
304 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
305 {
306 	ftblk_t *fbp = buf;
307 	int i;
308 
309 	bzero(fbp, sizeof (ftblk_t));
310 	if (str_ftstack != 0) {
311 		for (i = 0; i < FTBLK_EVNTS; i++)
312 			fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
313 	}
314 
315 	return (0);
316 }
317 
318 /* ARGSUSED */
319 static void
320 ftblk_destructor(void *buf, void *cdrarg)
321 {
322 	ftblk_t *fbp = buf;
323 	int i;
324 
325 	if (str_ftstack != 0) {
326 		for (i = 0; i < FTBLK_EVNTS; i++) {
327 			if (fbp->ev[i].stk != NULL) {
328 				kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
329 				fbp->ev[i].stk = NULL;
330 			}
331 		}
332 	}
333 }
334 
335 static int
336 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
337 {
338 	fthdr_t *fhp = buf;
339 
340 	return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
341 }
342 
343 static void
344 fthdr_destructor(void *buf, void *cdrarg)
345 {
346 	fthdr_t *fhp = buf;
347 
348 	ftblk_destructor(&fhp->first, cdrarg);
349 }
350 
351 void
352 streams_msg_init(void)
353 {
354 	char name[40];
355 	size_t size;
356 	size_t lastsize = DBLK_MIN_SIZE;
357 	size_t *sizep;
358 	struct kmem_cache *cp;
359 	size_t tot_size;
360 	int offset;
361 
362 	mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
363 	    NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
364 
365 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
366 
367 		if ((offset = (size & PAGEOFFSET)) != 0) {
368 			/*
369 			 * We are in the middle of a page, dblk should
370 			 * be allocated on the same page
371 			 */
372 			tot_size = size + sizeof (dblk_t);
373 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
374 			    < PAGESIZE);
375 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
376 
377 		} else {
378 
379 			/*
380 			 * buf size is multiple of page size, dblk and
381 			 * buffer are allocated separately.
382 			 */
383 
384 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
385 			tot_size = sizeof (dblk_t);
386 		}
387 
388 		(void) sprintf(name, "streams_dblk_%ld", size);
389 		cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
390 		    dblk_constructor, dblk_destructor, NULL, (void *)(size),
391 		    NULL, dblk_kmem_flags);
392 
393 		while (lastsize <= size) {
394 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
395 			lastsize += DBLK_MIN_SIZE;
396 		}
397 	}
398 
399 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
400 	    DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
401 	    (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
402 	fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
403 	    fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
404 	ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
405 	    ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
406 
407 	/* Initialize Multidata caches */
408 	mmd_init();
409 
410 	/* initialize throttling queue for esballoc */
411 	esballoc_queue_init();
412 }
413 
414 /*ARGSUSED*/
415 mblk_t *
416 allocb(size_t size, uint_t pri)
417 {
418 	dblk_t *dbp;
419 	mblk_t *mp;
420 	size_t index;
421 
422 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
423 
424 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
425 		if (size != 0) {
426 			mp = allocb_oversize(size, KM_NOSLEEP);
427 			goto out;
428 		}
429 		index = 0;
430 	}
431 
432 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
433 		mp = NULL;
434 		goto out;
435 	}
436 
437 	mp = dbp->db_mblk;
438 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
439 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
440 	mp->b_rptr = mp->b_wptr = dbp->db_base;
441 	mp->b_queue = NULL;
442 	MBLK_BAND_FLAG_WORD(mp) = 0;
443 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
444 out:
445 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
446 
447 	return (mp);
448 }
449 
450 /*
451  * Allocate an mblk taking db_credp and db_cpid from the template.
452  * Allow the cred to be NULL.
453  */
454 mblk_t *
455 allocb_tmpl(size_t size, const mblk_t *tmpl)
456 {
457 	mblk_t *mp = allocb(size, 0);
458 
459 	if (mp != NULL) {
460 		dblk_t *src = tmpl->b_datap;
461 		dblk_t *dst = mp->b_datap;
462 		cred_t *cr;
463 		pid_t cpid;
464 
465 		cr = msg_getcred(tmpl, &cpid);
466 		if (cr != NULL)
467 			crhold(dst->db_credp = cr);
468 		dst->db_cpid = cpid;
469 		dst->db_type = src->db_type;
470 	}
471 	return (mp);
472 }
473 
474 mblk_t *
475 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
476 {
477 	mblk_t *mp = allocb(size, 0);
478 
479 	ASSERT(cr != NULL);
480 	if (mp != NULL) {
481 		dblk_t *dbp = mp->b_datap;
482 
483 		crhold(dbp->db_credp = cr);
484 		dbp->db_cpid = cpid;
485 	}
486 	return (mp);
487 }
488 
489 mblk_t *
490 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
491 {
492 	mblk_t *mp = allocb_wait(size, 0, flags, error);
493 
494 	ASSERT(cr != NULL);
495 	if (mp != NULL) {
496 		dblk_t *dbp = mp->b_datap;
497 
498 		crhold(dbp->db_credp = cr);
499 		dbp->db_cpid = cpid;
500 	}
501 
502 	return (mp);
503 }
504 
505 /*
506  * Extract the db_cred (and optionally db_cpid) from a message.
507  * We find the first mblk which has a non-NULL db_cred and use that.
508  * If none found we return NULL.
509  * Does NOT get a hold on the cred.
510  */
511 cred_t *
512 msg_getcred(const mblk_t *mp, pid_t *cpidp)
513 {
514 	cred_t *cr = NULL;
515 	cred_t *cr2;
516 	mblk_t *mp2;
517 
518 	while (mp != NULL) {
519 		dblk_t *dbp = mp->b_datap;
520 
521 		cr = dbp->db_credp;
522 		if (cr == NULL) {
523 			mp = mp->b_cont;
524 			continue;
525 		}
526 		if (cpidp != NULL)
527 			*cpidp = dbp->db_cpid;
528 
529 #ifdef DEBUG
530 		/*
531 		 * Normally there should at most one db_credp in a message.
532 		 * But if there are multiple (as in the case of some M_IOC*
533 		 * and some internal messages in TCP/IP bind logic) then
534 		 * they must be identical in the normal case.
535 		 * However, a socket can be shared between different uids
536 		 * in which case data queued in TCP would be from different
537 		 * creds. Thus we can only assert for the zoneid being the
538 		 * same. Due to Multi-level Level Ports for TX, some
539 		 * cred_t can have a NULL cr_zone, and we skip the comparison
540 		 * in that case.
541 		 */
542 		mp2 = mp->b_cont;
543 		while (mp2 != NULL) {
544 			cr2 = DB_CRED(mp2);
545 			if (cr2 != NULL) {
546 				DTRACE_PROBE2(msg__getcred,
547 				    cred_t *, cr, cred_t *, cr2);
548 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
549 				    crgetzone(cr) == NULL ||
550 				    crgetzone(cr2) == NULL);
551 			}
552 			mp2 = mp2->b_cont;
553 		}
554 #endif
555 		return (cr);
556 	}
557 	if (cpidp != NULL)
558 		*cpidp = NOPID;
559 	return (NULL);
560 }
561 
562 /*
563  * Variant of msg_getcred which, when a cred is found
564  * 1. Returns with a hold on the cred
565  * 2. Clears the first cred in the mblk.
566  * This is more efficient to use than a msg_getcred() + crhold() when
567  * the message is freed after the cred has been extracted.
568  *
569  * The caller is responsible for ensuring that there is no other reference
570  * on the message since db_credp can not be cleared when there are other
571  * references.
572  */
573 cred_t *
574 msg_extractcred(mblk_t *mp, pid_t *cpidp)
575 {
576 	cred_t *cr = NULL;
577 	cred_t *cr2;
578 	mblk_t *mp2;
579 
580 	while (mp != NULL) {
581 		dblk_t *dbp = mp->b_datap;
582 
583 		cr = dbp->db_credp;
584 		if (cr == NULL) {
585 			mp = mp->b_cont;
586 			continue;
587 		}
588 		ASSERT(dbp->db_ref == 1);
589 		dbp->db_credp = NULL;
590 		if (cpidp != NULL)
591 			*cpidp = dbp->db_cpid;
592 #ifdef DEBUG
593 		/*
594 		 * Normally there should at most one db_credp in a message.
595 		 * But if there are multiple (as in the case of some M_IOC*
596 		 * and some internal messages in TCP/IP bind logic) then
597 		 * they must be identical in the normal case.
598 		 * However, a socket can be shared between different uids
599 		 * in which case data queued in TCP would be from different
600 		 * creds. Thus we can only assert for the zoneid being the
601 		 * same. Due to Multi-level Level Ports for TX, some
602 		 * cred_t can have a NULL cr_zone, and we skip the comparison
603 		 * in that case.
604 		 */
605 		mp2 = mp->b_cont;
606 		while (mp2 != NULL) {
607 			cr2 = DB_CRED(mp2);
608 			if (cr2 != NULL) {
609 				DTRACE_PROBE2(msg__extractcred,
610 				    cred_t *, cr, cred_t *, cr2);
611 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
612 				    crgetzone(cr) == NULL ||
613 				    crgetzone(cr2) == NULL);
614 			}
615 			mp2 = mp2->b_cont;
616 		}
617 #endif
618 		return (cr);
619 	}
620 	return (NULL);
621 }
622 /*
623  * Get the label for a message. Uses the first mblk in the message
624  * which has a non-NULL db_credp.
625  * Returns NULL if there is no credp.
626  */
627 extern struct ts_label_s *
628 msg_getlabel(const mblk_t *mp)
629 {
630 	cred_t *cr = msg_getcred(mp, NULL);
631 
632 	if (cr == NULL)
633 		return (NULL);
634 
635 	return (crgetlabel(cr));
636 }
637 
638 void
639 freeb(mblk_t *mp)
640 {
641 	dblk_t *dbp = mp->b_datap;
642 
643 	ASSERT(dbp->db_ref > 0);
644 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
645 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
646 
647 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
648 
649 	dbp->db_free(mp, dbp);
650 }
651 
652 void
653 freemsg(mblk_t *mp)
654 {
655 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
656 	while (mp) {
657 		dblk_t *dbp = mp->b_datap;
658 		mblk_t *mp_cont = mp->b_cont;
659 
660 		ASSERT(dbp->db_ref > 0);
661 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
662 
663 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
664 
665 		dbp->db_free(mp, dbp);
666 		mp = mp_cont;
667 	}
668 }
669 
670 /*
671  * Reallocate a block for another use.  Try hard to use the old block.
672  * If the old data is wanted (copy), leave b_wptr at the end of the data,
673  * otherwise return b_wptr = b_rptr.
674  *
675  * This routine is private and unstable.
676  */
677 mblk_t	*
678 reallocb(mblk_t *mp, size_t size, uint_t copy)
679 {
680 	mblk_t		*mp1;
681 	unsigned char	*old_rptr;
682 	ptrdiff_t	cur_size;
683 
684 	if (mp == NULL)
685 		return (allocb(size, BPRI_HI));
686 
687 	cur_size = mp->b_wptr - mp->b_rptr;
688 	old_rptr = mp->b_rptr;
689 
690 	ASSERT(mp->b_datap->db_ref != 0);
691 
692 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
693 		/*
694 		 * If the data is wanted and it will fit where it is, no
695 		 * work is required.
696 		 */
697 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
698 			return (mp);
699 
700 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
701 		mp1 = mp;
702 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
703 		/* XXX other mp state could be copied too, db_flags ... ? */
704 		mp1->b_cont = mp->b_cont;
705 	} else {
706 		return (NULL);
707 	}
708 
709 	if (copy) {
710 		bcopy(old_rptr, mp1->b_rptr, cur_size);
711 		mp1->b_wptr = mp1->b_rptr + cur_size;
712 	}
713 
714 	if (mp != mp1)
715 		freeb(mp);
716 
717 	return (mp1);
718 }
719 
720 static void
721 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
722 {
723 	ASSERT(dbp->db_mblk == mp);
724 	if (dbp->db_fthdr != NULL)
725 		str_ftfree(dbp);
726 
727 	/* set credp and projid to be 'unspecified' before returning to cache */
728 	if (dbp->db_credp != NULL) {
729 		crfree(dbp->db_credp);
730 		dbp->db_credp = NULL;
731 	}
732 	dbp->db_cpid = -1;
733 
734 	/* Reset the struioflag and the checksum flag fields */
735 	dbp->db_struioflag = 0;
736 	dbp->db_struioun.cksum.flags = 0;
737 
738 	/* and the COOKED and/or UIOA flag(s) */
739 	dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
740 
741 	kmem_cache_free(dbp->db_cache, dbp);
742 }
743 
744 static void
745 dblk_decref(mblk_t *mp, dblk_t *dbp)
746 {
747 	if (dbp->db_ref != 1) {
748 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
749 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
750 		/*
751 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
752 		 * have a reference to the dblk, which means another thread
753 		 * could free it.  Therefore we cannot examine the dblk to
754 		 * determine whether ours was the last reference.  Instead,
755 		 * we extract the new and minimum reference counts from rtfu.
756 		 * Note that all we're really saying is "if (ref != refmin)".
757 		 */
758 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
759 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
760 			kmem_cache_free(mblk_cache, mp);
761 			return;
762 		}
763 	}
764 	dbp->db_mblk = mp;
765 	dbp->db_free = dbp->db_lastfree;
766 	dbp->db_lastfree(mp, dbp);
767 }
768 
769 mblk_t *
770 dupb(mblk_t *mp)
771 {
772 	dblk_t *dbp = mp->b_datap;
773 	mblk_t *new_mp;
774 	uint32_t oldrtfu, newrtfu;
775 
776 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
777 		goto out;
778 
779 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
780 	new_mp->b_rptr = mp->b_rptr;
781 	new_mp->b_wptr = mp->b_wptr;
782 	new_mp->b_datap = dbp;
783 	new_mp->b_queue = NULL;
784 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
785 
786 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
787 
788 	dbp->db_free = dblk_decref;
789 	do {
790 		ASSERT(dbp->db_ref > 0);
791 		oldrtfu = DBLK_RTFU_WORD(dbp);
792 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
793 		/*
794 		 * If db_ref is maxed out we can't dup this message anymore.
795 		 */
796 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
797 			kmem_cache_free(mblk_cache, new_mp);
798 			new_mp = NULL;
799 			goto out;
800 		}
801 	} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
802 
803 out:
804 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
805 	return (new_mp);
806 }
807 
808 static void
809 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
810 {
811 	frtn_t *frp = dbp->db_frtnp;
812 
813 	ASSERT(dbp->db_mblk == mp);
814 	frp->free_func(frp->free_arg);
815 	if (dbp->db_fthdr != NULL)
816 		str_ftfree(dbp);
817 
818 	/* set credp and projid to be 'unspecified' before returning to cache */
819 	if (dbp->db_credp != NULL) {
820 		crfree(dbp->db_credp);
821 		dbp->db_credp = NULL;
822 	}
823 	dbp->db_cpid = -1;
824 	dbp->db_struioflag = 0;
825 	dbp->db_struioun.cksum.flags = 0;
826 
827 	kmem_cache_free(dbp->db_cache, dbp);
828 }
829 
830 /*ARGSUSED*/
831 static void
832 frnop_func(void *arg)
833 {
834 }
835 
836 /*
837  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
838  */
839 static mblk_t *
840 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
841 	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
842 {
843 	dblk_t *dbp;
844 	mblk_t *mp;
845 
846 	ASSERT(base != NULL && frp != NULL);
847 
848 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
849 		mp = NULL;
850 		goto out;
851 	}
852 
853 	mp = dbp->db_mblk;
854 	dbp->db_base = base;
855 	dbp->db_lim = base + size;
856 	dbp->db_free = dbp->db_lastfree = lastfree;
857 	dbp->db_frtnp = frp;
858 	DBLK_RTFU_WORD(dbp) = db_rtfu;
859 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
860 	mp->b_rptr = mp->b_wptr = base;
861 	mp->b_queue = NULL;
862 	MBLK_BAND_FLAG_WORD(mp) = 0;
863 
864 out:
865 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
866 	return (mp);
867 }
868 
869 /*ARGSUSED*/
870 mblk_t *
871 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
872 {
873 	mblk_t *mp;
874 
875 	/*
876 	 * Note that this is structured to allow the common case (i.e.
877 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
878 	 * call optimization.
879 	 */
880 	if (!str_ftnever) {
881 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
882 		    frp, freebs_enqueue, KM_NOSLEEP);
883 
884 		if (mp != NULL)
885 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
886 		return (mp);
887 	}
888 
889 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
890 	    frp, freebs_enqueue, KM_NOSLEEP));
891 }
892 
893 /*
894  * Same as esballoc() but sleeps waiting for memory.
895  */
896 /*ARGSUSED*/
897 mblk_t *
898 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
899 {
900 	mblk_t *mp;
901 
902 	/*
903 	 * Note that this is structured to allow the common case (i.e.
904 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
905 	 * call optimization.
906 	 */
907 	if (!str_ftnever) {
908 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
909 		    frp, freebs_enqueue, KM_SLEEP);
910 
911 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
912 		return (mp);
913 	}
914 
915 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
916 	    frp, freebs_enqueue, KM_SLEEP));
917 }
918 
919 /*ARGSUSED*/
920 mblk_t *
921 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
922 {
923 	mblk_t *mp;
924 
925 	/*
926 	 * Note that this is structured to allow the common case (i.e.
927 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
928 	 * call optimization.
929 	 */
930 	if (!str_ftnever) {
931 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
932 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
933 
934 		if (mp != NULL)
935 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
936 		return (mp);
937 	}
938 
939 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
940 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
941 }
942 
943 /*ARGSUSED*/
944 mblk_t *
945 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
946 {
947 	mblk_t *mp;
948 
949 	/*
950 	 * Note that this is structured to allow the common case (i.e.
951 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
952 	 * call optimization.
953 	 */
954 	if (!str_ftnever) {
955 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
956 		    frp, freebs_enqueue, KM_NOSLEEP);
957 
958 		if (mp != NULL)
959 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
960 		return (mp);
961 	}
962 
963 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
964 	    frp, freebs_enqueue, KM_NOSLEEP));
965 }
966 
967 /*ARGSUSED*/
968 mblk_t *
969 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
970 {
971 	mblk_t *mp;
972 
973 	/*
974 	 * Note that this is structured to allow the common case (i.e.
975 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
976 	 * call optimization.
977 	 */
978 	if (!str_ftnever) {
979 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
980 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
981 
982 		if (mp != NULL)
983 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
984 		return (mp);
985 	}
986 
987 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
988 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
989 }
990 
991 static void
992 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
993 {
994 	bcache_t *bcp = dbp->db_cache;
995 
996 	ASSERT(dbp->db_mblk == mp);
997 	if (dbp->db_fthdr != NULL)
998 		str_ftfree(dbp);
999 
1000 	/* set credp and projid to be 'unspecified' before returning to cache */
1001 	if (dbp->db_credp != NULL) {
1002 		crfree(dbp->db_credp);
1003 		dbp->db_credp = NULL;
1004 	}
1005 	dbp->db_cpid = -1;
1006 	dbp->db_struioflag = 0;
1007 	dbp->db_struioun.cksum.flags = 0;
1008 
1009 	mutex_enter(&bcp->mutex);
1010 	kmem_cache_free(bcp->dblk_cache, dbp);
1011 	bcp->alloc--;
1012 
1013 	if (bcp->alloc == 0 && bcp->destroy != 0) {
1014 		kmem_cache_destroy(bcp->dblk_cache);
1015 		kmem_cache_destroy(bcp->buffer_cache);
1016 		mutex_exit(&bcp->mutex);
1017 		mutex_destroy(&bcp->mutex);
1018 		kmem_free(bcp, sizeof (bcache_t));
1019 	} else {
1020 		mutex_exit(&bcp->mutex);
1021 	}
1022 }
1023 
1024 bcache_t *
1025 bcache_create(char *name, size_t size, uint_t align)
1026 {
1027 	bcache_t *bcp;
1028 	char buffer[255];
1029 
1030 	ASSERT((align & (align - 1)) == 0);
1031 
1032 	if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1033 		return (NULL);
1034 
1035 	bcp->size = size;
1036 	bcp->align = align;
1037 	bcp->alloc = 0;
1038 	bcp->destroy = 0;
1039 
1040 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1041 
1042 	(void) sprintf(buffer, "%s_buffer_cache", name);
1043 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1044 	    NULL, NULL, NULL, 0);
1045 	(void) sprintf(buffer, "%s_dblk_cache", name);
1046 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1047 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1048 	    NULL, (void *)bcp, NULL, 0);
1049 
1050 	return (bcp);
1051 }
1052 
1053 void
1054 bcache_destroy(bcache_t *bcp)
1055 {
1056 	ASSERT(bcp != NULL);
1057 
1058 	mutex_enter(&bcp->mutex);
1059 	if (bcp->alloc == 0) {
1060 		kmem_cache_destroy(bcp->dblk_cache);
1061 		kmem_cache_destroy(bcp->buffer_cache);
1062 		mutex_exit(&bcp->mutex);
1063 		mutex_destroy(&bcp->mutex);
1064 		kmem_free(bcp, sizeof (bcache_t));
1065 	} else {
1066 		bcp->destroy++;
1067 		mutex_exit(&bcp->mutex);
1068 	}
1069 }
1070 
1071 /*ARGSUSED*/
1072 mblk_t *
1073 bcache_allocb(bcache_t *bcp, uint_t pri)
1074 {
1075 	dblk_t *dbp;
1076 	mblk_t *mp = NULL;
1077 
1078 	ASSERT(bcp != NULL);
1079 
1080 	mutex_enter(&bcp->mutex);
1081 	if (bcp->destroy != 0) {
1082 		mutex_exit(&bcp->mutex);
1083 		goto out;
1084 	}
1085 
1086 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1087 		mutex_exit(&bcp->mutex);
1088 		goto out;
1089 	}
1090 	bcp->alloc++;
1091 	mutex_exit(&bcp->mutex);
1092 
1093 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1094 
1095 	mp = dbp->db_mblk;
1096 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1097 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
1098 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1099 	mp->b_queue = NULL;
1100 	MBLK_BAND_FLAG_WORD(mp) = 0;
1101 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1102 out:
1103 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1104 
1105 	return (mp);
1106 }
1107 
1108 static void
1109 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1110 {
1111 	ASSERT(dbp->db_mblk == mp);
1112 	if (dbp->db_fthdr != NULL)
1113 		str_ftfree(dbp);
1114 
1115 	/* set credp and projid to be 'unspecified' before returning to cache */
1116 	if (dbp->db_credp != NULL) {
1117 		crfree(dbp->db_credp);
1118 		dbp->db_credp = NULL;
1119 	}
1120 	dbp->db_cpid = -1;
1121 	dbp->db_struioflag = 0;
1122 	dbp->db_struioun.cksum.flags = 0;
1123 
1124 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1125 	kmem_cache_free(dbp->db_cache, dbp);
1126 }
1127 
1128 static mblk_t *
1129 allocb_oversize(size_t size, int kmflags)
1130 {
1131 	mblk_t *mp;
1132 	void *buf;
1133 
1134 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1135 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
1136 		return (NULL);
1137 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1138 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1139 		kmem_free(buf, size);
1140 
1141 	if (mp != NULL)
1142 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1143 
1144 	return (mp);
1145 }
1146 
1147 mblk_t *
1148 allocb_tryhard(size_t target_size)
1149 {
1150 	size_t size;
1151 	mblk_t *bp;
1152 
1153 	for (size = target_size; size < target_size + 512;
1154 	    size += DBLK_CACHE_ALIGN)
1155 		if ((bp = allocb(size, BPRI_HI)) != NULL)
1156 			return (bp);
1157 	allocb_tryhard_fails++;
1158 	return (NULL);
1159 }
1160 
1161 /*
1162  * This routine is consolidation private for STREAMS internal use
1163  * This routine may only be called from sync routines (i.e., not
1164  * from put or service procedures).  It is located here (rather
1165  * than strsubr.c) so that we don't have to expose all of the
1166  * allocb() implementation details in header files.
1167  */
1168 mblk_t *
1169 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1170 {
1171 	dblk_t *dbp;
1172 	mblk_t *mp;
1173 	size_t index;
1174 
1175 	index = (size -1) >> DBLK_SIZE_SHIFT;
1176 
1177 	if (flags & STR_NOSIG) {
1178 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1179 			if (size != 0) {
1180 				mp = allocb_oversize(size, KM_SLEEP);
1181 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1182 				    (uintptr_t)mp);
1183 				return (mp);
1184 			}
1185 			index = 0;
1186 		}
1187 
1188 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1189 		mp = dbp->db_mblk;
1190 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1191 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1192 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1193 		mp->b_queue = NULL;
1194 		MBLK_BAND_FLAG_WORD(mp) = 0;
1195 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1196 
1197 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1198 
1199 	} else {
1200 		while ((mp = allocb(size, pri)) == NULL) {
1201 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1202 				return (NULL);
1203 		}
1204 	}
1205 
1206 	return (mp);
1207 }
1208 
1209 /*
1210  * Call function 'func' with 'arg' when a class zero block can
1211  * be allocated with priority 'pri'.
1212  */
1213 bufcall_id_t
1214 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1215 {
1216 	return (bufcall(1, pri, func, arg));
1217 }
1218 
1219 /*
1220  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1221  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1222  * This provides consistency for all internal allocators of ioctl.
1223  */
1224 mblk_t *
1225 mkiocb(uint_t cmd)
1226 {
1227 	struct iocblk	*ioc;
1228 	mblk_t		*mp;
1229 
1230 	/*
1231 	 * Allocate enough space for any of the ioctl related messages.
1232 	 */
1233 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1234 		return (NULL);
1235 
1236 	bzero(mp->b_rptr, sizeof (union ioctypes));
1237 
1238 	/*
1239 	 * Set the mblk_t information and ptrs correctly.
1240 	 */
1241 	mp->b_wptr += sizeof (struct iocblk);
1242 	mp->b_datap->db_type = M_IOCTL;
1243 
1244 	/*
1245 	 * Fill in the fields.
1246 	 */
1247 	ioc		= (struct iocblk *)mp->b_rptr;
1248 	ioc->ioc_cmd	= cmd;
1249 	ioc->ioc_cr	= kcred;
1250 	ioc->ioc_id	= getiocseqno();
1251 	ioc->ioc_flag	= IOC_NATIVE;
1252 	return (mp);
1253 }
1254 
1255 /*
1256  * test if block of given size can be allocated with a request of
1257  * the given priority.
1258  * 'pri' is no longer used, but is retained for compatibility.
1259  */
1260 /* ARGSUSED */
1261 int
1262 testb(size_t size, uint_t pri)
1263 {
1264 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1265 }
1266 
1267 /*
1268  * Call function 'func' with argument 'arg' when there is a reasonably
1269  * good chance that a block of size 'size' can be allocated.
1270  * 'pri' is no longer used, but is retained for compatibility.
1271  */
1272 /* ARGSUSED */
1273 bufcall_id_t
1274 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1275 {
1276 	static long bid = 1;	/* always odd to save checking for zero */
1277 	bufcall_id_t bc_id;
1278 	struct strbufcall *bcp;
1279 
1280 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1281 		return (0);
1282 
1283 	bcp->bc_func = func;
1284 	bcp->bc_arg = arg;
1285 	bcp->bc_size = size;
1286 	bcp->bc_next = NULL;
1287 	bcp->bc_executor = NULL;
1288 
1289 	mutex_enter(&strbcall_lock);
1290 	/*
1291 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1292 	 * should be no references to bcp since it may be freed by
1293 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1294 	 * the local var.
1295 	 */
1296 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1297 
1298 	/*
1299 	 * add newly allocated stream event to existing
1300 	 * linked list of events.
1301 	 */
1302 	if (strbcalls.bc_head == NULL) {
1303 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1304 	} else {
1305 		strbcalls.bc_tail->bc_next = bcp;
1306 		strbcalls.bc_tail = bcp;
1307 	}
1308 
1309 	cv_signal(&strbcall_cv);
1310 	mutex_exit(&strbcall_lock);
1311 	return (bc_id);
1312 }
1313 
1314 /*
1315  * Cancel a bufcall request.
1316  */
1317 void
1318 unbufcall(bufcall_id_t id)
1319 {
1320 	strbufcall_t *bcp, *pbcp;
1321 
1322 	mutex_enter(&strbcall_lock);
1323 again:
1324 	pbcp = NULL;
1325 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1326 		if (id == bcp->bc_id)
1327 			break;
1328 		pbcp = bcp;
1329 	}
1330 	if (bcp) {
1331 		if (bcp->bc_executor != NULL) {
1332 			if (bcp->bc_executor != curthread) {
1333 				cv_wait(&bcall_cv, &strbcall_lock);
1334 				goto again;
1335 			}
1336 		} else {
1337 			if (pbcp)
1338 				pbcp->bc_next = bcp->bc_next;
1339 			else
1340 				strbcalls.bc_head = bcp->bc_next;
1341 			if (bcp == strbcalls.bc_tail)
1342 				strbcalls.bc_tail = pbcp;
1343 			kmem_free(bcp, sizeof (strbufcall_t));
1344 		}
1345 	}
1346 	mutex_exit(&strbcall_lock);
1347 }
1348 
1349 /*
1350  * Duplicate a message block by block (uses dupb), returning
1351  * a pointer to the duplicate message.
1352  * Returns a non-NULL value only if the entire message
1353  * was dup'd.
1354  */
1355 mblk_t *
1356 dupmsg(mblk_t *bp)
1357 {
1358 	mblk_t *head, *nbp;
1359 
1360 	if (!bp || !(nbp = head = dupb(bp)))
1361 		return (NULL);
1362 
1363 	while (bp->b_cont) {
1364 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1365 			freemsg(head);
1366 			return (NULL);
1367 		}
1368 		nbp = nbp->b_cont;
1369 		bp = bp->b_cont;
1370 	}
1371 	return (head);
1372 }
1373 
1374 #define	DUPB_NOLOAN(bp) \
1375 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1376 	copyb((bp)) : dupb((bp)))
1377 
1378 mblk_t *
1379 dupmsg_noloan(mblk_t *bp)
1380 {
1381 	mblk_t *head, *nbp;
1382 
1383 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1384 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1385 		return (NULL);
1386 
1387 	while (bp->b_cont) {
1388 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1389 			freemsg(head);
1390 			return (NULL);
1391 		}
1392 		nbp = nbp->b_cont;
1393 		bp = bp->b_cont;
1394 	}
1395 	return (head);
1396 }
1397 
1398 /*
1399  * Copy data from message and data block to newly allocated message and
1400  * data block. Returns new message block pointer, or NULL if error.
1401  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1402  * as in the original even when db_base is not word aligned. (bug 1052877)
1403  */
1404 mblk_t *
1405 copyb(mblk_t *bp)
1406 {
1407 	mblk_t	*nbp;
1408 	dblk_t	*dp, *ndp;
1409 	uchar_t *base;
1410 	size_t	size;
1411 	size_t	unaligned;
1412 
1413 	ASSERT(bp->b_wptr >= bp->b_rptr);
1414 
1415 	dp = bp->b_datap;
1416 	if (dp->db_fthdr != NULL)
1417 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1418 
1419 	/*
1420 	 * Special handling for Multidata message; this should be
1421 	 * removed once a copy-callback routine is made available.
1422 	 */
1423 	if (dp->db_type == M_MULTIDATA) {
1424 		cred_t *cr;
1425 
1426 		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1427 			return (NULL);
1428 
1429 		nbp->b_flag = bp->b_flag;
1430 		nbp->b_band = bp->b_band;
1431 		ndp = nbp->b_datap;
1432 
1433 		/* See comments below on potential issues. */
1434 		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1435 
1436 		ASSERT(ndp->db_type == dp->db_type);
1437 		cr = dp->db_credp;
1438 		if (cr != NULL)
1439 			crhold(ndp->db_credp = cr);
1440 		ndp->db_cpid = dp->db_cpid;
1441 		return (nbp);
1442 	}
1443 
1444 	size = dp->db_lim - dp->db_base;
1445 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1446 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1447 		return (NULL);
1448 	nbp->b_flag = bp->b_flag;
1449 	nbp->b_band = bp->b_band;
1450 	ndp = nbp->b_datap;
1451 
1452 	/*
1453 	 * Well, here is a potential issue.  If we are trying to
1454 	 * trace a flow, and we copy the message, we might lose
1455 	 * information about where this message might have been.
1456 	 * So we should inherit the FT data.  On the other hand,
1457 	 * a user might be interested only in alloc to free data.
1458 	 * So I guess the real answer is to provide a tunable.
1459 	 */
1460 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1461 
1462 	base = ndp->db_base + unaligned;
1463 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1464 
1465 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1466 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1467 
1468 	return (nbp);
1469 }
1470 
1471 /*
1472  * Copy data from message to newly allocated message using new
1473  * data blocks.  Returns a pointer to the new message, or NULL if error.
1474  */
1475 mblk_t *
1476 copymsg(mblk_t *bp)
1477 {
1478 	mblk_t *head, *nbp;
1479 
1480 	if (!bp || !(nbp = head = copyb(bp)))
1481 		return (NULL);
1482 
1483 	while (bp->b_cont) {
1484 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1485 			freemsg(head);
1486 			return (NULL);
1487 		}
1488 		nbp = nbp->b_cont;
1489 		bp = bp->b_cont;
1490 	}
1491 	return (head);
1492 }
1493 
1494 /*
1495  * link a message block to tail of message
1496  */
1497 void
1498 linkb(mblk_t *mp, mblk_t *bp)
1499 {
1500 	ASSERT(mp && bp);
1501 
1502 	for (; mp->b_cont; mp = mp->b_cont)
1503 		;
1504 	mp->b_cont = bp;
1505 }
1506 
1507 /*
1508  * unlink a message block from head of message
1509  * return pointer to new message.
1510  * NULL if message becomes empty.
1511  */
1512 mblk_t *
1513 unlinkb(mblk_t *bp)
1514 {
1515 	mblk_t *bp1;
1516 
1517 	bp1 = bp->b_cont;
1518 	bp->b_cont = NULL;
1519 	return (bp1);
1520 }
1521 
1522 /*
1523  * remove a message block "bp" from message "mp"
1524  *
1525  * Return pointer to new message or NULL if no message remains.
1526  * Return -1 if bp is not found in message.
1527  */
1528 mblk_t *
1529 rmvb(mblk_t *mp, mblk_t *bp)
1530 {
1531 	mblk_t *tmp;
1532 	mblk_t *lastp = NULL;
1533 
1534 	ASSERT(mp && bp);
1535 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1536 		if (tmp == bp) {
1537 			if (lastp)
1538 				lastp->b_cont = tmp->b_cont;
1539 			else
1540 				mp = tmp->b_cont;
1541 			tmp->b_cont = NULL;
1542 			return (mp);
1543 		}
1544 		lastp = tmp;
1545 	}
1546 	return ((mblk_t *)-1);
1547 }
1548 
1549 /*
1550  * Concatenate and align first len bytes of common
1551  * message type.  Len == -1, means concat everything.
1552  * Returns 1 on success, 0 on failure
1553  * After the pullup, mp points to the pulled up data.
1554  */
1555 int
1556 pullupmsg(mblk_t *mp, ssize_t len)
1557 {
1558 	mblk_t *bp, *b_cont;
1559 	dblk_t *dbp;
1560 	ssize_t n;
1561 
1562 	ASSERT(mp->b_datap->db_ref > 0);
1563 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1564 
1565 	/*
1566 	 * We won't handle Multidata message, since it contains
1567 	 * metadata which this function has no knowledge of; we
1568 	 * assert on DEBUG, and return failure otherwise.
1569 	 */
1570 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1571 	if (mp->b_datap->db_type == M_MULTIDATA)
1572 		return (0);
1573 
1574 	if (len == -1) {
1575 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1576 			return (1);
1577 		len = xmsgsize(mp);
1578 	} else {
1579 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1580 		ASSERT(first_mblk_len >= 0);
1581 		/*
1582 		 * If the length is less than that of the first mblk,
1583 		 * we want to pull up the message into an aligned mblk.
1584 		 * Though not part of the spec, some callers assume it.
1585 		 */
1586 		if (len <= first_mblk_len) {
1587 			if (str_aligned(mp->b_rptr))
1588 				return (1);
1589 			len = first_mblk_len;
1590 		} else if (xmsgsize(mp) < len)
1591 			return (0);
1592 	}
1593 
1594 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1595 		return (0);
1596 
1597 	dbp = bp->b_datap;
1598 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1599 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1600 	mp->b_datap->db_mblk = mp;
1601 	bp->b_datap->db_mblk = bp;
1602 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1603 
1604 	do {
1605 		ASSERT(bp->b_datap->db_ref > 0);
1606 		ASSERT(bp->b_wptr >= bp->b_rptr);
1607 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1608 		bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1609 		mp->b_wptr += n;
1610 		bp->b_rptr += n;
1611 		len -= n;
1612 		if (bp->b_rptr != bp->b_wptr)
1613 			break;
1614 		b_cont = bp->b_cont;
1615 		freeb(bp);
1616 		bp = b_cont;
1617 	} while (len && bp);
1618 
1619 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1620 
1621 	return (1);
1622 }
1623 
1624 /*
1625  * Concatenate and align at least the first len bytes of common message
1626  * type.  Len == -1 means concatenate everything.  The original message is
1627  * unaltered.  Returns a pointer to a new message on success, otherwise
1628  * returns NULL.
1629  */
1630 mblk_t *
1631 msgpullup(mblk_t *mp, ssize_t len)
1632 {
1633 	mblk_t	*newmp;
1634 	ssize_t	totlen;
1635 	ssize_t	n;
1636 
1637 	/*
1638 	 * We won't handle Multidata message, since it contains
1639 	 * metadata which this function has no knowledge of; we
1640 	 * assert on DEBUG, and return failure otherwise.
1641 	 */
1642 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1643 	if (mp->b_datap->db_type == M_MULTIDATA)
1644 		return (NULL);
1645 
1646 	totlen = xmsgsize(mp);
1647 
1648 	if ((len > 0) && (len > totlen))
1649 		return (NULL);
1650 
1651 	/*
1652 	 * Copy all of the first msg type into one new mblk, then dupmsg
1653 	 * and link the rest onto this.
1654 	 */
1655 
1656 	len = totlen;
1657 
1658 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1659 		return (NULL);
1660 
1661 	newmp->b_flag = mp->b_flag;
1662 	newmp->b_band = mp->b_band;
1663 
1664 	while (len > 0) {
1665 		n = mp->b_wptr - mp->b_rptr;
1666 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1667 		if (n > 0)
1668 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1669 		newmp->b_wptr += n;
1670 		len -= n;
1671 		mp = mp->b_cont;
1672 	}
1673 
1674 	if (mp != NULL) {
1675 		newmp->b_cont = dupmsg(mp);
1676 		if (newmp->b_cont == NULL) {
1677 			freemsg(newmp);
1678 			return (NULL);
1679 		}
1680 	}
1681 
1682 	return (newmp);
1683 }
1684 
1685 /*
1686  * Trim bytes from message
1687  *  len > 0, trim from head
1688  *  len < 0, trim from tail
1689  * Returns 1 on success, 0 on failure.
1690  */
1691 int
1692 adjmsg(mblk_t *mp, ssize_t len)
1693 {
1694 	mblk_t *bp;
1695 	mblk_t *save_bp = NULL;
1696 	mblk_t *prev_bp;
1697 	mblk_t *bcont;
1698 	unsigned char type;
1699 	ssize_t n;
1700 	int fromhead;
1701 	int first;
1702 
1703 	ASSERT(mp != NULL);
1704 	/*
1705 	 * We won't handle Multidata message, since it contains
1706 	 * metadata which this function has no knowledge of; we
1707 	 * assert on DEBUG, and return failure otherwise.
1708 	 */
1709 	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1710 	if (mp->b_datap->db_type == M_MULTIDATA)
1711 		return (0);
1712 
1713 	if (len < 0) {
1714 		fromhead = 0;
1715 		len = -len;
1716 	} else {
1717 		fromhead = 1;
1718 	}
1719 
1720 	if (xmsgsize(mp) < len)
1721 		return (0);
1722 
1723 	if (fromhead) {
1724 		first = 1;
1725 		while (len) {
1726 			ASSERT(mp->b_wptr >= mp->b_rptr);
1727 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1728 			mp->b_rptr += n;
1729 			len -= n;
1730 
1731 			/*
1732 			 * If this is not the first zero length
1733 			 * message remove it
1734 			 */
1735 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1736 				bcont = mp->b_cont;
1737 				freeb(mp);
1738 				mp = save_bp->b_cont = bcont;
1739 			} else {
1740 				save_bp = mp;
1741 				mp = mp->b_cont;
1742 			}
1743 			first = 0;
1744 		}
1745 	} else {
1746 		type = mp->b_datap->db_type;
1747 		while (len) {
1748 			bp = mp;
1749 			save_bp = NULL;
1750 
1751 			/*
1752 			 * Find the last message of same type
1753 			 */
1754 			while (bp && bp->b_datap->db_type == type) {
1755 				ASSERT(bp->b_wptr >= bp->b_rptr);
1756 				prev_bp = save_bp;
1757 				save_bp = bp;
1758 				bp = bp->b_cont;
1759 			}
1760 			if (save_bp == NULL)
1761 				break;
1762 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1763 			save_bp->b_wptr -= n;
1764 			len -= n;
1765 
1766 			/*
1767 			 * If this is not the first message
1768 			 * and we have taken away everything
1769 			 * from this message, remove it
1770 			 */
1771 
1772 			if ((save_bp != mp) &&
1773 			    (save_bp->b_wptr == save_bp->b_rptr)) {
1774 				bcont = save_bp->b_cont;
1775 				freeb(save_bp);
1776 				prev_bp->b_cont = bcont;
1777 			}
1778 		}
1779 	}
1780 	return (1);
1781 }
1782 
1783 /*
1784  * get number of data bytes in message
1785  */
1786 size_t
1787 msgdsize(mblk_t *bp)
1788 {
1789 	size_t count = 0;
1790 
1791 	for (; bp; bp = bp->b_cont)
1792 		if (bp->b_datap->db_type == M_DATA) {
1793 			ASSERT(bp->b_wptr >= bp->b_rptr);
1794 			count += bp->b_wptr - bp->b_rptr;
1795 		}
1796 	return (count);
1797 }
1798 
1799 /*
1800  * Get a message off head of queue
1801  *
1802  * If queue has no buffers then mark queue
1803  * with QWANTR. (queue wants to be read by
1804  * someone when data becomes available)
1805  *
1806  * If there is something to take off then do so.
1807  * If queue falls below hi water mark turn off QFULL
1808  * flag.  Decrement weighted count of queue.
1809  * Also turn off QWANTR because queue is being read.
1810  *
1811  * The queue count is maintained on a per-band basis.
1812  * Priority band 0 (normal messages) uses q_count,
1813  * q_lowat, etc.  Non-zero priority bands use the
1814  * fields in their respective qband structures
1815  * (qb_count, qb_lowat, etc.)  All messages appear
1816  * on the same list, linked via their b_next pointers.
1817  * q_first is the head of the list.  q_count does
1818  * not reflect the size of all the messages on the
1819  * queue.  It only reflects those messages in the
1820  * normal band of flow.  The one exception to this
1821  * deals with high priority messages.  They are in
1822  * their own conceptual "band", but are accounted
1823  * against q_count.
1824  *
1825  * If queue count is below the lo water mark and QWANTW
1826  * is set, enable the closest backq which has a service
1827  * procedure and turn off the QWANTW flag.
1828  *
1829  * getq could be built on top of rmvq, but isn't because
1830  * of performance considerations.
1831  *
1832  * A note on the use of q_count and q_mblkcnt:
1833  *   q_count is the traditional byte count for messages that
1834  *   have been put on a queue.  Documentation tells us that
1835  *   we shouldn't rely on that count, but some drivers/modules
1836  *   do.  What was needed, however, is a mechanism to prevent
1837  *   runaway streams from consuming all of the resources,
1838  *   and particularly be able to flow control zero-length
1839  *   messages.  q_mblkcnt is used for this purpose.  It
1840  *   counts the number of mblk's that are being put on
1841  *   the queue.  The intention here, is that each mblk should
1842  *   contain one byte of data and, for the purpose of
1843  *   flow-control, logically does.  A queue will become
1844  *   full when EITHER of these values (q_count and q_mblkcnt)
1845  *   reach the highwater mark.  It will clear when BOTH
1846  *   of them drop below the highwater mark.  And it will
1847  *   backenable when BOTH of them drop below the lowwater
1848  *   mark.
1849  *   With this algorithm, a driver/module might be able
1850  *   to find a reasonably accurate q_count, and the
1851  *   framework can still try and limit resource usage.
1852  */
1853 mblk_t *
1854 getq(queue_t *q)
1855 {
1856 	mblk_t *bp;
1857 	uchar_t band = 0;
1858 
1859 	bp = getq_noenab(q, 0);
1860 	if (bp != NULL)
1861 		band = bp->b_band;
1862 
1863 	/*
1864 	 * Inlined from qbackenable().
1865 	 * Quick check without holding the lock.
1866 	 */
1867 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1868 		return (bp);
1869 
1870 	qbackenable(q, band);
1871 	return (bp);
1872 }
1873 
1874 /*
1875  * Calculate number of data bytes in a single data message block taking
1876  * multidata messages into account.
1877  */
1878 
1879 #define	ADD_MBLK_SIZE(mp, size) 					\
1880 	if (DB_TYPE(mp) != M_MULTIDATA) {				\
1881 		(size) += MBLKL(mp);					\
1882 	} else {							\
1883 		uint_t	pinuse;						\
1884 									\
1885 		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
1886 		(size) += pinuse;					\
1887 	}
1888 
1889 /*
1890  * Returns the number of bytes in a message (a message is defined as a
1891  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1892  * also return the number of distinct mblks in the message.
1893  */
1894 int
1895 mp_cont_len(mblk_t *bp, int *mblkcnt)
1896 {
1897 	mblk_t	*mp;
1898 	int	mblks = 0;
1899 	int	bytes = 0;
1900 
1901 	for (mp = bp; mp != NULL; mp = mp->b_cont) {
1902 		ADD_MBLK_SIZE(mp, bytes);
1903 		mblks++;
1904 	}
1905 
1906 	if (mblkcnt != NULL)
1907 		*mblkcnt = mblks;
1908 
1909 	return (bytes);
1910 }
1911 
1912 /*
1913  * Like getq() but does not backenable.  This is used by the stream
1914  * head when a putback() is likely.  The caller must call qbackenable()
1915  * after it is done with accessing the queue.
1916  * The rbytes arguments to getq_noneab() allows callers to specify a
1917  * the maximum number of bytes to return. If the current amount on the
1918  * queue is less than this then the entire message will be returned.
1919  * A value of 0 returns the entire message and is equivalent to the old
1920  * default behaviour prior to the addition of the rbytes argument.
1921  */
1922 mblk_t *
1923 getq_noenab(queue_t *q, ssize_t rbytes)
1924 {
1925 	mblk_t *bp, *mp1;
1926 	mblk_t *mp2 = NULL;
1927 	qband_t *qbp;
1928 	kthread_id_t freezer;
1929 	int	bytecnt = 0, mblkcnt = 0;
1930 
1931 	/* freezestr should allow its caller to call getq/putq */
1932 	freezer = STREAM(q)->sd_freezer;
1933 	if (freezer == curthread) {
1934 		ASSERT(frozenstr(q));
1935 		ASSERT(MUTEX_HELD(QLOCK(q)));
1936 	} else
1937 		mutex_enter(QLOCK(q));
1938 
1939 	if ((bp = q->q_first) == 0) {
1940 		q->q_flag |= QWANTR;
1941 	} else {
1942 		/*
1943 		 * If the caller supplied a byte threshold and there is
1944 		 * more than this amount on the queue then break up the
1945 		 * the message appropriately.  We can only safely do
1946 		 * this for M_DATA messages.
1947 		 */
1948 		if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1949 		    (q->q_count > rbytes)) {
1950 			/*
1951 			 * Inline version of mp_cont_len() which terminates
1952 			 * when we meet or exceed rbytes.
1953 			 */
1954 			for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1955 				mblkcnt++;
1956 				ADD_MBLK_SIZE(mp1, bytecnt);
1957 				if (bytecnt  >= rbytes)
1958 					break;
1959 			}
1960 			/*
1961 			 * We need to account for the following scenarios:
1962 			 *
1963 			 * 1) Too much data in the first message:
1964 			 *	mp1 will be the mblk which puts us over our
1965 			 *	byte limit.
1966 			 * 2) Not enough data in the first message:
1967 			 *	mp1 will be NULL.
1968 			 * 3) Exactly the right amount of data contained within
1969 			 *    whole mblks:
1970 			 *	mp1->b_cont will be where we break the message.
1971 			 */
1972 			if (bytecnt > rbytes) {
1973 				/*
1974 				 * Dup/copy mp1 and put what we don't need
1975 				 * back onto the queue. Adjust the read/write
1976 				 * and continuation pointers appropriately
1977 				 * and decrement the current mblk count to
1978 				 * reflect we are putting an mblk back onto
1979 				 * the queue.
1980 				 * When adjusting the message pointers, it's
1981 				 * OK to use the existing bytecnt and the
1982 				 * requested amount (rbytes) to calculate the
1983 				 * the new write offset (b_wptr) of what we
1984 				 * are taking. However, we  cannot use these
1985 				 * values when calculating the read offset of
1986 				 * the mblk we are putting back on the queue.
1987 				 * This is because the begining (b_rptr) of the
1988 				 * mblk represents some arbitrary point within
1989 				 * the message.
1990 				 * It's simplest to do this by advancing b_rptr
1991 				 * by the new length of mp1 as we don't have to
1992 				 * remember any intermediate state.
1993 				 */
1994 				ASSERT(mp1 != NULL);
1995 				mblkcnt--;
1996 				if ((mp2 = dupb(mp1)) == NULL &&
1997 				    (mp2 = copyb(mp1)) == NULL) {
1998 					bytecnt = mblkcnt = 0;
1999 					goto dup_failed;
2000 				}
2001 				mp2->b_cont = mp1->b_cont;
2002 				mp1->b_wptr -= bytecnt - rbytes;
2003 				mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
2004 				mp1->b_cont = NULL;
2005 				bytecnt = rbytes;
2006 			} else {
2007 				/*
2008 				 * Either there is not enough data in the first
2009 				 * message or there is no excess data to deal
2010 				 * with. If mp1 is NULL, we are taking the
2011 				 * whole message. No need to do anything.
2012 				 * Otherwise we assign mp1->b_cont to mp2 as
2013 				 * we will be putting this back onto the head of
2014 				 * the queue.
2015 				 */
2016 				if (mp1 != NULL) {
2017 					mp2 = mp1->b_cont;
2018 					mp1->b_cont = NULL;
2019 				}
2020 			}
2021 			/*
2022 			 * If mp2 is not NULL then we have part of the message
2023 			 * to put back onto the queue.
2024 			 */
2025 			if (mp2 != NULL) {
2026 				if ((mp2->b_next = bp->b_next) == NULL)
2027 					q->q_last = mp2;
2028 				else
2029 					bp->b_next->b_prev = mp2;
2030 				q->q_first = mp2;
2031 			} else {
2032 				if ((q->q_first = bp->b_next) == NULL)
2033 					q->q_last = NULL;
2034 				else
2035 					q->q_first->b_prev = NULL;
2036 			}
2037 		} else {
2038 			/*
2039 			 * Either no byte threshold was supplied, there is
2040 			 * not enough on the queue or we failed to
2041 			 * duplicate/copy a data block. In these cases we
2042 			 * just take the entire first message.
2043 			 */
2044 dup_failed:
2045 			bytecnt = mp_cont_len(bp, &mblkcnt);
2046 			if ((q->q_first = bp->b_next) == NULL)
2047 				q->q_last = NULL;
2048 			else
2049 				q->q_first->b_prev = NULL;
2050 		}
2051 		if (bp->b_band == 0) {
2052 			q->q_count -= bytecnt;
2053 			q->q_mblkcnt -= mblkcnt;
2054 			if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2055 			    (q->q_mblkcnt < q->q_hiwat))) {
2056 				q->q_flag &= ~QFULL;
2057 			}
2058 		} else {
2059 			int i;
2060 
2061 			ASSERT(bp->b_band <= q->q_nband);
2062 			ASSERT(q->q_bandp != NULL);
2063 			ASSERT(MUTEX_HELD(QLOCK(q)));
2064 			qbp = q->q_bandp;
2065 			i = bp->b_band;
2066 			while (--i > 0)
2067 				qbp = qbp->qb_next;
2068 			if (qbp->qb_first == qbp->qb_last) {
2069 				qbp->qb_first = NULL;
2070 				qbp->qb_last = NULL;
2071 			} else {
2072 				qbp->qb_first = bp->b_next;
2073 			}
2074 			qbp->qb_count -= bytecnt;
2075 			qbp->qb_mblkcnt -= mblkcnt;
2076 			if (qbp->qb_mblkcnt == 0 ||
2077 			    ((qbp->qb_count < qbp->qb_hiwat) &&
2078 			    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2079 				qbp->qb_flag &= ~QB_FULL;
2080 			}
2081 		}
2082 		q->q_flag &= ~QWANTR;
2083 		bp->b_next = NULL;
2084 		bp->b_prev = NULL;
2085 	}
2086 	if (freezer != curthread)
2087 		mutex_exit(QLOCK(q));
2088 
2089 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
2090 
2091 	return (bp);
2092 }
2093 
2094 /*
2095  * Determine if a backenable is needed after removing a message in the
2096  * specified band.
2097  * NOTE: This routine assumes that something like getq_noenab() has been
2098  * already called.
2099  *
2100  * For the read side it is ok to hold sd_lock across calling this (and the
2101  * stream head often does).
2102  * But for the write side strwakeq might be invoked and it acquires sd_lock.
2103  */
2104 void
2105 qbackenable(queue_t *q, uchar_t band)
2106 {
2107 	int backenab = 0;
2108 	qband_t *qbp;
2109 	kthread_id_t freezer;
2110 
2111 	ASSERT(q);
2112 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2113 
2114 	/*
2115 	 * Quick check without holding the lock.
2116 	 * OK since after getq() has lowered the q_count these flags
2117 	 * would not change unless either the qbackenable() is done by
2118 	 * another thread (which is ok) or the queue has gotten QFULL
2119 	 * in which case another backenable will take place when the queue
2120 	 * drops below q_lowat.
2121 	 */
2122 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2123 		return;
2124 
2125 	/* freezestr should allow its caller to call getq/putq */
2126 	freezer = STREAM(q)->sd_freezer;
2127 	if (freezer == curthread) {
2128 		ASSERT(frozenstr(q));
2129 		ASSERT(MUTEX_HELD(QLOCK(q)));
2130 	} else
2131 		mutex_enter(QLOCK(q));
2132 
2133 	if (band == 0) {
2134 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2135 		    q->q_mblkcnt < q->q_lowat)) {
2136 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2137 		}
2138 	} else {
2139 		int i;
2140 
2141 		ASSERT((unsigned)band <= q->q_nband);
2142 		ASSERT(q->q_bandp != NULL);
2143 
2144 		qbp = q->q_bandp;
2145 		i = band;
2146 		while (--i > 0)
2147 			qbp = qbp->qb_next;
2148 
2149 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2150 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
2151 			backenab = qbp->qb_flag & QB_WANTW;
2152 		}
2153 	}
2154 
2155 	if (backenab == 0) {
2156 		if (freezer != curthread)
2157 			mutex_exit(QLOCK(q));
2158 		return;
2159 	}
2160 
2161 	/* Have to drop the lock across strwakeq and backenable */
2162 	if (backenab & QWANTWSYNC)
2163 		q->q_flag &= ~QWANTWSYNC;
2164 	if (backenab & (QWANTW|QB_WANTW)) {
2165 		if (band != 0)
2166 			qbp->qb_flag &= ~QB_WANTW;
2167 		else {
2168 			q->q_flag &= ~QWANTW;
2169 		}
2170 	}
2171 
2172 	if (freezer != curthread)
2173 		mutex_exit(QLOCK(q));
2174 
2175 	if (backenab & QWANTWSYNC)
2176 		strwakeq(q, QWANTWSYNC);
2177 	if (backenab & (QWANTW|QB_WANTW))
2178 		backenable(q, band);
2179 }
2180 
2181 /*
2182  * Remove a message from a queue.  The queue count and other
2183  * flow control parameters are adjusted and the back queue
2184  * enabled if necessary.
2185  *
2186  * rmvq can be called with the stream frozen, but other utility functions
2187  * holding QLOCK, and by streams modules without any locks/frozen.
2188  */
2189 void
2190 rmvq(queue_t *q, mblk_t *mp)
2191 {
2192 	ASSERT(mp != NULL);
2193 
2194 	rmvq_noenab(q, mp);
2195 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2196 		/*
2197 		 * qbackenable can handle a frozen stream but not a "random"
2198 		 * qlock being held. Drop lock across qbackenable.
2199 		 */
2200 		mutex_exit(QLOCK(q));
2201 		qbackenable(q, mp->b_band);
2202 		mutex_enter(QLOCK(q));
2203 	} else {
2204 		qbackenable(q, mp->b_band);
2205 	}
2206 }
2207 
2208 /*
2209  * Like rmvq() but without any backenabling.
2210  * This exists to handle SR_CONSOL_DATA in strrput().
2211  */
2212 void
2213 rmvq_noenab(queue_t *q, mblk_t *mp)
2214 {
2215 	int i;
2216 	qband_t *qbp = NULL;
2217 	kthread_id_t freezer;
2218 	int	bytecnt = 0, mblkcnt = 0;
2219 
2220 	freezer = STREAM(q)->sd_freezer;
2221 	if (freezer == curthread) {
2222 		ASSERT(frozenstr(q));
2223 		ASSERT(MUTEX_HELD(QLOCK(q)));
2224 	} else if (MUTEX_HELD(QLOCK(q))) {
2225 		/* Don't drop lock on exit */
2226 		freezer = curthread;
2227 	} else
2228 		mutex_enter(QLOCK(q));
2229 
2230 	ASSERT(mp->b_band <= q->q_nband);
2231 	if (mp->b_band != 0) {		/* Adjust band pointers */
2232 		ASSERT(q->q_bandp != NULL);
2233 		qbp = q->q_bandp;
2234 		i = mp->b_band;
2235 		while (--i > 0)
2236 			qbp = qbp->qb_next;
2237 		if (mp == qbp->qb_first) {
2238 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
2239 				qbp->qb_first = mp->b_next;
2240 			else
2241 				qbp->qb_first = NULL;
2242 		}
2243 		if (mp == qbp->qb_last) {
2244 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2245 				qbp->qb_last = mp->b_prev;
2246 			else
2247 				qbp->qb_last = NULL;
2248 		}
2249 	}
2250 
2251 	/*
2252 	 * Remove the message from the list.
2253 	 */
2254 	if (mp->b_prev)
2255 		mp->b_prev->b_next = mp->b_next;
2256 	else
2257 		q->q_first = mp->b_next;
2258 	if (mp->b_next)
2259 		mp->b_next->b_prev = mp->b_prev;
2260 	else
2261 		q->q_last = mp->b_prev;
2262 	mp->b_next = NULL;
2263 	mp->b_prev = NULL;
2264 
2265 	/* Get the size of the message for q_count accounting */
2266 	bytecnt = mp_cont_len(mp, &mblkcnt);
2267 
2268 	if (mp->b_band == 0) {		/* Perform q_count accounting */
2269 		q->q_count -= bytecnt;
2270 		q->q_mblkcnt -= mblkcnt;
2271 		if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2272 		    (q->q_mblkcnt < q->q_hiwat))) {
2273 			q->q_flag &= ~QFULL;
2274 		}
2275 	} else {			/* Perform qb_count accounting */
2276 		qbp->qb_count -= bytecnt;
2277 		qbp->qb_mblkcnt -= mblkcnt;
2278 		if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2279 		    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2280 			qbp->qb_flag &= ~QB_FULL;
2281 		}
2282 	}
2283 	if (freezer != curthread)
2284 		mutex_exit(QLOCK(q));
2285 
2286 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
2287 }
2288 
2289 /*
2290  * Empty a queue.
2291  * If flag is set, remove all messages.  Otherwise, remove
2292  * only non-control messages.  If queue falls below its low
2293  * water mark, and QWANTW is set, enable the nearest upstream
2294  * service procedure.
2295  *
2296  * Historical note: when merging the M_FLUSH code in strrput with this
2297  * code one difference was discovered. flushq did not have a check
2298  * for q_lowat == 0 in the backenabling test.
2299  *
2300  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2301  * if one exists on the queue.
2302  */
2303 void
2304 flushq_common(queue_t *q, int flag, int pcproto_flag)
2305 {
2306 	mblk_t *mp, *nmp;
2307 	qband_t *qbp;
2308 	int backenab = 0;
2309 	unsigned char bpri;
2310 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2311 
2312 	if (q->q_first == NULL)
2313 		return;
2314 
2315 	mutex_enter(QLOCK(q));
2316 	mp = q->q_first;
2317 	q->q_first = NULL;
2318 	q->q_last = NULL;
2319 	q->q_count = 0;
2320 	q->q_mblkcnt = 0;
2321 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2322 		qbp->qb_first = NULL;
2323 		qbp->qb_last = NULL;
2324 		qbp->qb_count = 0;
2325 		qbp->qb_mblkcnt = 0;
2326 		qbp->qb_flag &= ~QB_FULL;
2327 	}
2328 	q->q_flag &= ~QFULL;
2329 	mutex_exit(QLOCK(q));
2330 	while (mp) {
2331 		nmp = mp->b_next;
2332 		mp->b_next = mp->b_prev = NULL;
2333 
2334 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2335 
2336 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2337 			(void) putq(q, mp);
2338 		else if (flag || datamsg(mp->b_datap->db_type))
2339 			freemsg(mp);
2340 		else
2341 			(void) putq(q, mp);
2342 		mp = nmp;
2343 	}
2344 	bpri = 1;
2345 	mutex_enter(QLOCK(q));
2346 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2347 		if ((qbp->qb_flag & QB_WANTW) &&
2348 		    (((qbp->qb_count < qbp->qb_lowat) &&
2349 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2350 		    qbp->qb_lowat == 0)) {
2351 			qbp->qb_flag &= ~QB_WANTW;
2352 			backenab = 1;
2353 			qbf[bpri] = 1;
2354 		} else
2355 			qbf[bpri] = 0;
2356 		bpri++;
2357 	}
2358 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2359 	if ((q->q_flag & QWANTW) &&
2360 	    (((q->q_count < q->q_lowat) &&
2361 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2362 		q->q_flag &= ~QWANTW;
2363 		backenab = 1;
2364 		qbf[0] = 1;
2365 	} else
2366 		qbf[0] = 0;
2367 
2368 	/*
2369 	 * If any band can now be written to, and there is a writer
2370 	 * for that band, then backenable the closest service procedure.
2371 	 */
2372 	if (backenab) {
2373 		mutex_exit(QLOCK(q));
2374 		for (bpri = q->q_nband; bpri != 0; bpri--)
2375 			if (qbf[bpri])
2376 				backenable(q, bpri);
2377 		if (qbf[0])
2378 			backenable(q, 0);
2379 	} else
2380 		mutex_exit(QLOCK(q));
2381 }
2382 
2383 /*
2384  * The real flushing takes place in flushq_common. This is done so that
2385  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2386  * or not. Currently the only place that uses this flag is the stream head.
2387  */
2388 void
2389 flushq(queue_t *q, int flag)
2390 {
2391 	flushq_common(q, flag, 0);
2392 }
2393 
2394 /*
2395  * Flush the queue of messages of the given priority band.
2396  * There is some duplication of code between flushq and flushband.
2397  * This is because we want to optimize the code as much as possible.
2398  * The assumption is that there will be more messages in the normal
2399  * (priority 0) band than in any other.
2400  *
2401  * Historical note: when merging the M_FLUSH code in strrput with this
2402  * code one difference was discovered. flushband had an extra check for
2403  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2404  * case. That check does not match the man page for flushband and was not
2405  * in the strrput flush code hence it was removed.
2406  */
2407 void
2408 flushband(queue_t *q, unsigned char pri, int flag)
2409 {
2410 	mblk_t *mp;
2411 	mblk_t *nmp;
2412 	mblk_t *last;
2413 	qband_t *qbp;
2414 	int band;
2415 
2416 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2417 	if (pri > q->q_nband) {
2418 		return;
2419 	}
2420 	mutex_enter(QLOCK(q));
2421 	if (pri == 0) {
2422 		mp = q->q_first;
2423 		q->q_first = NULL;
2424 		q->q_last = NULL;
2425 		q->q_count = 0;
2426 		q->q_mblkcnt = 0;
2427 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2428 			qbp->qb_first = NULL;
2429 			qbp->qb_last = NULL;
2430 			qbp->qb_count = 0;
2431 			qbp->qb_mblkcnt = 0;
2432 			qbp->qb_flag &= ~QB_FULL;
2433 		}
2434 		q->q_flag &= ~QFULL;
2435 		mutex_exit(QLOCK(q));
2436 		while (mp) {
2437 			nmp = mp->b_next;
2438 			mp->b_next = mp->b_prev = NULL;
2439 			if ((mp->b_band == 0) &&
2440 			    ((flag == FLUSHALL) ||
2441 			    datamsg(mp->b_datap->db_type)))
2442 				freemsg(mp);
2443 			else
2444 				(void) putq(q, mp);
2445 			mp = nmp;
2446 		}
2447 		mutex_enter(QLOCK(q));
2448 		if ((q->q_flag & QWANTW) &&
2449 		    (((q->q_count < q->q_lowat) &&
2450 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2451 			q->q_flag &= ~QWANTW;
2452 			mutex_exit(QLOCK(q));
2453 
2454 			backenable(q, pri);
2455 		} else
2456 			mutex_exit(QLOCK(q));
2457 	} else {	/* pri != 0 */
2458 		boolean_t flushed = B_FALSE;
2459 		band = pri;
2460 
2461 		ASSERT(MUTEX_HELD(QLOCK(q)));
2462 		qbp = q->q_bandp;
2463 		while (--band > 0)
2464 			qbp = qbp->qb_next;
2465 		mp = qbp->qb_first;
2466 		if (mp == NULL) {
2467 			mutex_exit(QLOCK(q));
2468 			return;
2469 		}
2470 		last = qbp->qb_last->b_next;
2471 		/*
2472 		 * rmvq_noenab() and freemsg() are called for each mblk that
2473 		 * meets the criteria.  The loop is executed until the last
2474 		 * mblk has been processed.
2475 		 */
2476 		while (mp != last) {
2477 			ASSERT(mp->b_band == pri);
2478 			nmp = mp->b_next;
2479 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2480 				rmvq_noenab(q, mp);
2481 				freemsg(mp);
2482 				flushed = B_TRUE;
2483 			}
2484 			mp = nmp;
2485 		}
2486 		mutex_exit(QLOCK(q));
2487 
2488 		/*
2489 		 * If any mblk(s) has been freed, we know that qbackenable()
2490 		 * will need to be called.
2491 		 */
2492 		if (flushed)
2493 			qbackenable(q, pri);
2494 	}
2495 }
2496 
2497 /*
2498  * Return 1 if the queue is not full.  If the queue is full, return
2499  * 0 (may not put message) and set QWANTW flag (caller wants to write
2500  * to the queue).
2501  */
2502 int
2503 canput(queue_t *q)
2504 {
2505 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2506 
2507 	/* this is for loopback transports, they should not do a canput */
2508 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2509 
2510 	/* Find next forward module that has a service procedure */
2511 	q = q->q_nfsrv;
2512 
2513 	if (!(q->q_flag & QFULL)) {
2514 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2515 		return (1);
2516 	}
2517 	mutex_enter(QLOCK(q));
2518 	if (q->q_flag & QFULL) {
2519 		q->q_flag |= QWANTW;
2520 		mutex_exit(QLOCK(q));
2521 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2522 		return (0);
2523 	}
2524 	mutex_exit(QLOCK(q));
2525 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2526 	return (1);
2527 }
2528 
2529 /*
2530  * This is the new canput for use with priority bands.  Return 1 if the
2531  * band is not full.  If the band is full, return 0 (may not put message)
2532  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2533  * write to the queue).
2534  */
2535 int
2536 bcanput(queue_t *q, unsigned char pri)
2537 {
2538 	qband_t *qbp;
2539 
2540 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2541 	if (!q)
2542 		return (0);
2543 
2544 	/* Find next forward module that has a service procedure */
2545 	q = q->q_nfsrv;
2546 
2547 	mutex_enter(QLOCK(q));
2548 	if (pri == 0) {
2549 		if (q->q_flag & QFULL) {
2550 			q->q_flag |= QWANTW;
2551 			mutex_exit(QLOCK(q));
2552 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2553 			    "bcanput:%p %X %d", q, pri, 0);
2554 			return (0);
2555 		}
2556 	} else {	/* pri != 0 */
2557 		if (pri > q->q_nband) {
2558 			/*
2559 			 * No band exists yet, so return success.
2560 			 */
2561 			mutex_exit(QLOCK(q));
2562 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2563 			    "bcanput:%p %X %d", q, pri, 1);
2564 			return (1);
2565 		}
2566 		qbp = q->q_bandp;
2567 		while (--pri)
2568 			qbp = qbp->qb_next;
2569 		if (qbp->qb_flag & QB_FULL) {
2570 			qbp->qb_flag |= QB_WANTW;
2571 			mutex_exit(QLOCK(q));
2572 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2573 			    "bcanput:%p %X %d", q, pri, 0);
2574 			return (0);
2575 		}
2576 	}
2577 	mutex_exit(QLOCK(q));
2578 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2579 	    "bcanput:%p %X %d", q, pri, 1);
2580 	return (1);
2581 }
2582 
2583 /*
2584  * Put a message on a queue.
2585  *
2586  * Messages are enqueued on a priority basis.  The priority classes
2587  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2588  * and B_NORMAL (type < QPCTL && band == 0).
2589  *
2590  * Add appropriate weighted data block sizes to queue count.
2591  * If queue hits high water mark then set QFULL flag.
2592  *
2593  * If QNOENAB is not set (putq is allowed to enable the queue),
2594  * enable the queue only if the message is PRIORITY,
2595  * or the QWANTR flag is set (indicating that the service procedure
2596  * is ready to read the queue.  This implies that a service
2597  * procedure must NEVER put a high priority message back on its own
2598  * queue, as this would result in an infinite loop (!).
2599  */
2600 int
2601 putq(queue_t *q, mblk_t *bp)
2602 {
2603 	mblk_t *tmp;
2604 	qband_t *qbp = NULL;
2605 	int mcls = (int)queclass(bp);
2606 	kthread_id_t freezer;
2607 	int	bytecnt = 0, mblkcnt = 0;
2608 
2609 	freezer = STREAM(q)->sd_freezer;
2610 	if (freezer == curthread) {
2611 		ASSERT(frozenstr(q));
2612 		ASSERT(MUTEX_HELD(QLOCK(q)));
2613 	} else
2614 		mutex_enter(QLOCK(q));
2615 
2616 	/*
2617 	 * Make sanity checks and if qband structure is not yet
2618 	 * allocated, do so.
2619 	 */
2620 	if (mcls == QPCTL) {
2621 		if (bp->b_band != 0)
2622 			bp->b_band = 0;		/* force to be correct */
2623 	} else if (bp->b_band != 0) {
2624 		int i;
2625 		qband_t **qbpp;
2626 
2627 		if (bp->b_band > q->q_nband) {
2628 
2629 			/*
2630 			 * The qband structure for this priority band is
2631 			 * not on the queue yet, so we have to allocate
2632 			 * one on the fly.  It would be wasteful to
2633 			 * associate the qband structures with every
2634 			 * queue when the queues are allocated.  This is
2635 			 * because most queues will only need the normal
2636 			 * band of flow which can be described entirely
2637 			 * by the queue itself.
2638 			 */
2639 			qbpp = &q->q_bandp;
2640 			while (*qbpp)
2641 				qbpp = &(*qbpp)->qb_next;
2642 			while (bp->b_band > q->q_nband) {
2643 				if ((*qbpp = allocband()) == NULL) {
2644 					if (freezer != curthread)
2645 						mutex_exit(QLOCK(q));
2646 					return (0);
2647 				}
2648 				(*qbpp)->qb_hiwat = q->q_hiwat;
2649 				(*qbpp)->qb_lowat = q->q_lowat;
2650 				q->q_nband++;
2651 				qbpp = &(*qbpp)->qb_next;
2652 			}
2653 		}
2654 		ASSERT(MUTEX_HELD(QLOCK(q)));
2655 		qbp = q->q_bandp;
2656 		i = bp->b_band;
2657 		while (--i)
2658 			qbp = qbp->qb_next;
2659 	}
2660 
2661 	/*
2662 	 * If queue is empty, add the message and initialize the pointers.
2663 	 * Otherwise, adjust message pointers and queue pointers based on
2664 	 * the type of the message and where it belongs on the queue.  Some
2665 	 * code is duplicated to minimize the number of conditionals and
2666 	 * hopefully minimize the amount of time this routine takes.
2667 	 */
2668 	if (!q->q_first) {
2669 		bp->b_next = NULL;
2670 		bp->b_prev = NULL;
2671 		q->q_first = bp;
2672 		q->q_last = bp;
2673 		if (qbp) {
2674 			qbp->qb_first = bp;
2675 			qbp->qb_last = bp;
2676 		}
2677 	} else if (!qbp) {	/* bp->b_band == 0 */
2678 
2679 		/*
2680 		 * If queue class of message is less than or equal to
2681 		 * that of the last one on the queue, tack on to the end.
2682 		 */
2683 		tmp = q->q_last;
2684 		if (mcls <= (int)queclass(tmp)) {
2685 			bp->b_next = NULL;
2686 			bp->b_prev = tmp;
2687 			tmp->b_next = bp;
2688 			q->q_last = bp;
2689 		} else {
2690 			tmp = q->q_first;
2691 			while ((int)queclass(tmp) >= mcls)
2692 				tmp = tmp->b_next;
2693 
2694 			/*
2695 			 * Insert bp before tmp.
2696 			 */
2697 			bp->b_next = tmp;
2698 			bp->b_prev = tmp->b_prev;
2699 			if (tmp->b_prev)
2700 				tmp->b_prev->b_next = bp;
2701 			else
2702 				q->q_first = bp;
2703 			tmp->b_prev = bp;
2704 		}
2705 	} else {		/* bp->b_band != 0 */
2706 		if (qbp->qb_first) {
2707 			tmp = qbp->qb_last;
2708 
2709 			/*
2710 			 * Insert bp after the last message in this band.
2711 			 */
2712 			bp->b_next = tmp->b_next;
2713 			if (tmp->b_next)
2714 				tmp->b_next->b_prev = bp;
2715 			else
2716 				q->q_last = bp;
2717 			bp->b_prev = tmp;
2718 			tmp->b_next = bp;
2719 		} else {
2720 			tmp = q->q_last;
2721 			if ((mcls < (int)queclass(tmp)) ||
2722 			    (bp->b_band <= tmp->b_band)) {
2723 
2724 				/*
2725 				 * Tack bp on end of queue.
2726 				 */
2727 				bp->b_next = NULL;
2728 				bp->b_prev = tmp;
2729 				tmp->b_next = bp;
2730 				q->q_last = bp;
2731 			} else {
2732 				tmp = q->q_first;
2733 				while (tmp->b_datap->db_type >= QPCTL)
2734 					tmp = tmp->b_next;
2735 				while (tmp->b_band >= bp->b_band)
2736 					tmp = tmp->b_next;
2737 
2738 				/*
2739 				 * Insert bp before tmp.
2740 				 */
2741 				bp->b_next = tmp;
2742 				bp->b_prev = tmp->b_prev;
2743 				if (tmp->b_prev)
2744 					tmp->b_prev->b_next = bp;
2745 				else
2746 					q->q_first = bp;
2747 				tmp->b_prev = bp;
2748 			}
2749 			qbp->qb_first = bp;
2750 		}
2751 		qbp->qb_last = bp;
2752 	}
2753 
2754 	/* Get message byte count for q_count accounting */
2755 	bytecnt = mp_cont_len(bp, &mblkcnt);
2756 
2757 	if (qbp) {
2758 		qbp->qb_count += bytecnt;
2759 		qbp->qb_mblkcnt += mblkcnt;
2760 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2761 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2762 			qbp->qb_flag |= QB_FULL;
2763 		}
2764 	} else {
2765 		q->q_count += bytecnt;
2766 		q->q_mblkcnt += mblkcnt;
2767 		if ((q->q_count >= q->q_hiwat) ||
2768 		    (q->q_mblkcnt >= q->q_hiwat)) {
2769 			q->q_flag |= QFULL;
2770 		}
2771 	}
2772 
2773 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2774 
2775 	if ((mcls > QNORM) ||
2776 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2777 		qenable_locked(q);
2778 	ASSERT(MUTEX_HELD(QLOCK(q)));
2779 	if (freezer != curthread)
2780 		mutex_exit(QLOCK(q));
2781 
2782 	return (1);
2783 }
2784 
2785 /*
2786  * Put stuff back at beginning of Q according to priority order.
2787  * See comment on putq above for details.
2788  */
2789 int
2790 putbq(queue_t *q, mblk_t *bp)
2791 {
2792 	mblk_t *tmp;
2793 	qband_t *qbp = NULL;
2794 	int mcls = (int)queclass(bp);
2795 	kthread_id_t freezer;
2796 	int	bytecnt = 0, mblkcnt = 0;
2797 
2798 	ASSERT(q && bp);
2799 	ASSERT(bp->b_next == NULL);
2800 	freezer = STREAM(q)->sd_freezer;
2801 	if (freezer == curthread) {
2802 		ASSERT(frozenstr(q));
2803 		ASSERT(MUTEX_HELD(QLOCK(q)));
2804 	} else
2805 		mutex_enter(QLOCK(q));
2806 
2807 	/*
2808 	 * Make sanity checks and if qband structure is not yet
2809 	 * allocated, do so.
2810 	 */
2811 	if (mcls == QPCTL) {
2812 		if (bp->b_band != 0)
2813 			bp->b_band = 0;		/* force to be correct */
2814 	} else if (bp->b_band != 0) {
2815 		int i;
2816 		qband_t **qbpp;
2817 
2818 		if (bp->b_band > q->q_nband) {
2819 			qbpp = &q->q_bandp;
2820 			while (*qbpp)
2821 				qbpp = &(*qbpp)->qb_next;
2822 			while (bp->b_band > q->q_nband) {
2823 				if ((*qbpp = allocband()) == NULL) {
2824 					if (freezer != curthread)
2825 						mutex_exit(QLOCK(q));
2826 					return (0);
2827 				}
2828 				(*qbpp)->qb_hiwat = q->q_hiwat;
2829 				(*qbpp)->qb_lowat = q->q_lowat;
2830 				q->q_nband++;
2831 				qbpp = &(*qbpp)->qb_next;
2832 			}
2833 		}
2834 		qbp = q->q_bandp;
2835 		i = bp->b_band;
2836 		while (--i)
2837 			qbp = qbp->qb_next;
2838 	}
2839 
2840 	/*
2841 	 * If queue is empty or if message is high priority,
2842 	 * place on the front of the queue.
2843 	 */
2844 	tmp = q->q_first;
2845 	if ((!tmp) || (mcls == QPCTL)) {
2846 		bp->b_next = tmp;
2847 		if (tmp)
2848 			tmp->b_prev = bp;
2849 		else
2850 			q->q_last = bp;
2851 		q->q_first = bp;
2852 		bp->b_prev = NULL;
2853 		if (qbp) {
2854 			qbp->qb_first = bp;
2855 			qbp->qb_last = bp;
2856 		}
2857 	} else if (qbp) {	/* bp->b_band != 0 */
2858 		tmp = qbp->qb_first;
2859 		if (tmp) {
2860 
2861 			/*
2862 			 * Insert bp before the first message in this band.
2863 			 */
2864 			bp->b_next = tmp;
2865 			bp->b_prev = tmp->b_prev;
2866 			if (tmp->b_prev)
2867 				tmp->b_prev->b_next = bp;
2868 			else
2869 				q->q_first = bp;
2870 			tmp->b_prev = bp;
2871 		} else {
2872 			tmp = q->q_last;
2873 			if ((mcls < (int)queclass(tmp)) ||
2874 			    (bp->b_band < tmp->b_band)) {
2875 
2876 				/*
2877 				 * Tack bp on end of queue.
2878 				 */
2879 				bp->b_next = NULL;
2880 				bp->b_prev = tmp;
2881 				tmp->b_next = bp;
2882 				q->q_last = bp;
2883 			} else {
2884 				tmp = q->q_first;
2885 				while (tmp->b_datap->db_type >= QPCTL)
2886 					tmp = tmp->b_next;
2887 				while (tmp->b_band > bp->b_band)
2888 					tmp = tmp->b_next;
2889 
2890 				/*
2891 				 * Insert bp before tmp.
2892 				 */
2893 				bp->b_next = tmp;
2894 				bp->b_prev = tmp->b_prev;
2895 				if (tmp->b_prev)
2896 					tmp->b_prev->b_next = bp;
2897 				else
2898 					q->q_first = bp;
2899 				tmp->b_prev = bp;
2900 			}
2901 			qbp->qb_last = bp;
2902 		}
2903 		qbp->qb_first = bp;
2904 	} else {		/* bp->b_band == 0 && !QPCTL */
2905 
2906 		/*
2907 		 * If the queue class or band is less than that of the last
2908 		 * message on the queue, tack bp on the end of the queue.
2909 		 */
2910 		tmp = q->q_last;
2911 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2912 			bp->b_next = NULL;
2913 			bp->b_prev = tmp;
2914 			tmp->b_next = bp;
2915 			q->q_last = bp;
2916 		} else {
2917 			tmp = q->q_first;
2918 			while (tmp->b_datap->db_type >= QPCTL)
2919 				tmp = tmp->b_next;
2920 			while (tmp->b_band > bp->b_band)
2921 				tmp = tmp->b_next;
2922 
2923 			/*
2924 			 * Insert bp before tmp.
2925 			 */
2926 			bp->b_next = tmp;
2927 			bp->b_prev = tmp->b_prev;
2928 			if (tmp->b_prev)
2929 				tmp->b_prev->b_next = bp;
2930 			else
2931 				q->q_first = bp;
2932 			tmp->b_prev = bp;
2933 		}
2934 	}
2935 
2936 	/* Get message byte count for q_count accounting */
2937 	bytecnt = mp_cont_len(bp, &mblkcnt);
2938 
2939 	if (qbp) {
2940 		qbp->qb_count += bytecnt;
2941 		qbp->qb_mblkcnt += mblkcnt;
2942 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2943 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2944 			qbp->qb_flag |= QB_FULL;
2945 		}
2946 	} else {
2947 		q->q_count += bytecnt;
2948 		q->q_mblkcnt += mblkcnt;
2949 		if ((q->q_count >= q->q_hiwat) ||
2950 		    (q->q_mblkcnt >= q->q_hiwat)) {
2951 			q->q_flag |= QFULL;
2952 		}
2953 	}
2954 
2955 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2956 
2957 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2958 		qenable_locked(q);
2959 	ASSERT(MUTEX_HELD(QLOCK(q)));
2960 	if (freezer != curthread)
2961 		mutex_exit(QLOCK(q));
2962 
2963 	return (1);
2964 }
2965 
2966 /*
2967  * Insert a message before an existing message on the queue.  If the
2968  * existing message is NULL, the new messages is placed on the end of
2969  * the queue.  The queue class of the new message is ignored.  However,
2970  * the priority band of the new message must adhere to the following
2971  * ordering:
2972  *
2973  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2974  *
2975  * All flow control parameters are updated.
2976  *
2977  * insq can be called with the stream frozen, but other utility functions
2978  * holding QLOCK, and by streams modules without any locks/frozen.
2979  */
2980 int
2981 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2982 {
2983 	mblk_t *tmp;
2984 	qband_t *qbp = NULL;
2985 	int mcls = (int)queclass(mp);
2986 	kthread_id_t freezer;
2987 	int	bytecnt = 0, mblkcnt = 0;
2988 
2989 	freezer = STREAM(q)->sd_freezer;
2990 	if (freezer == curthread) {
2991 		ASSERT(frozenstr(q));
2992 		ASSERT(MUTEX_HELD(QLOCK(q)));
2993 	} else if (MUTEX_HELD(QLOCK(q))) {
2994 		/* Don't drop lock on exit */
2995 		freezer = curthread;
2996 	} else
2997 		mutex_enter(QLOCK(q));
2998 
2999 	if (mcls == QPCTL) {
3000 		if (mp->b_band != 0)
3001 			mp->b_band = 0;		/* force to be correct */
3002 		if (emp && emp->b_prev &&
3003 		    (emp->b_prev->b_datap->db_type < QPCTL))
3004 			goto badord;
3005 	}
3006 	if (emp) {
3007 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
3008 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
3009 		    (emp->b_prev->b_band < mp->b_band))) {
3010 			goto badord;
3011 		}
3012 	} else {
3013 		tmp = q->q_last;
3014 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3015 badord:
3016 			cmn_err(CE_WARN,
3017 			    "insq: attempt to insert message out of order "
3018 			    "on q %p", (void *)q);
3019 			if (freezer != curthread)
3020 				mutex_exit(QLOCK(q));
3021 			return (0);
3022 		}
3023 	}
3024 
3025 	if (mp->b_band != 0) {
3026 		int i;
3027 		qband_t **qbpp;
3028 
3029 		if (mp->b_band > q->q_nband) {
3030 			qbpp = &q->q_bandp;
3031 			while (*qbpp)
3032 				qbpp = &(*qbpp)->qb_next;
3033 			while (mp->b_band > q->q_nband) {
3034 				if ((*qbpp = allocband()) == NULL) {
3035 					if (freezer != curthread)
3036 						mutex_exit(QLOCK(q));
3037 					return (0);
3038 				}
3039 				(*qbpp)->qb_hiwat = q->q_hiwat;
3040 				(*qbpp)->qb_lowat = q->q_lowat;
3041 				q->q_nband++;
3042 				qbpp = &(*qbpp)->qb_next;
3043 			}
3044 		}
3045 		qbp = q->q_bandp;
3046 		i = mp->b_band;
3047 		while (--i)
3048 			qbp = qbp->qb_next;
3049 	}
3050 
3051 	if ((mp->b_next = emp) != NULL) {
3052 		if ((mp->b_prev = emp->b_prev) != NULL)
3053 			emp->b_prev->b_next = mp;
3054 		else
3055 			q->q_first = mp;
3056 		emp->b_prev = mp;
3057 	} else {
3058 		if ((mp->b_prev = q->q_last) != NULL)
3059 			q->q_last->b_next = mp;
3060 		else
3061 			q->q_first = mp;
3062 		q->q_last = mp;
3063 	}
3064 
3065 	/* Get mblk and byte count for q_count accounting */
3066 	bytecnt = mp_cont_len(mp, &mblkcnt);
3067 
3068 	if (qbp) {	/* adjust qband pointers and count */
3069 		if (!qbp->qb_first) {
3070 			qbp->qb_first = mp;
3071 			qbp->qb_last = mp;
3072 		} else {
3073 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3074 			    (mp->b_prev->b_band != mp->b_band)))
3075 				qbp->qb_first = mp;
3076 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
3077 			    (mp->b_next->b_band != mp->b_band)))
3078 				qbp->qb_last = mp;
3079 		}
3080 		qbp->qb_count += bytecnt;
3081 		qbp->qb_mblkcnt += mblkcnt;
3082 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
3083 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3084 			qbp->qb_flag |= QB_FULL;
3085 		}
3086 	} else {
3087 		q->q_count += bytecnt;
3088 		q->q_mblkcnt += mblkcnt;
3089 		if ((q->q_count >= q->q_hiwat) ||
3090 		    (q->q_mblkcnt >= q->q_hiwat)) {
3091 			q->q_flag |= QFULL;
3092 		}
3093 	}
3094 
3095 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
3096 
3097 	if (canenable(q) && (q->q_flag & QWANTR))
3098 		qenable_locked(q);
3099 
3100 	ASSERT(MUTEX_HELD(QLOCK(q)));
3101 	if (freezer != curthread)
3102 		mutex_exit(QLOCK(q));
3103 
3104 	return (1);
3105 }
3106 
3107 /*
3108  * Create and put a control message on queue.
3109  */
3110 int
3111 putctl(queue_t *q, int type)
3112 {
3113 	mblk_t *bp;
3114 
3115 	if ((datamsg(type) && (type != M_DELAY)) ||
3116 	    (bp = allocb_tryhard(0)) == NULL)
3117 		return (0);
3118 	bp->b_datap->db_type = (unsigned char) type;
3119 
3120 	put(q, bp);
3121 
3122 	return (1);
3123 }
3124 
3125 /*
3126  * Control message with a single-byte parameter
3127  */
3128 int
3129 putctl1(queue_t *q, int type, int param)
3130 {
3131 	mblk_t *bp;
3132 
3133 	if ((datamsg(type) && (type != M_DELAY)) ||
3134 	    (bp = allocb_tryhard(1)) == NULL)
3135 		return (0);
3136 	bp->b_datap->db_type = (unsigned char)type;
3137 	*bp->b_wptr++ = (unsigned char)param;
3138 
3139 	put(q, bp);
3140 
3141 	return (1);
3142 }
3143 
3144 int
3145 putnextctl1(queue_t *q, int type, int param)
3146 {
3147 	mblk_t *bp;
3148 
3149 	if ((datamsg(type) && (type != M_DELAY)) ||
3150 	    ((bp = allocb_tryhard(1)) == NULL))
3151 		return (0);
3152 
3153 	bp->b_datap->db_type = (unsigned char)type;
3154 	*bp->b_wptr++ = (unsigned char)param;
3155 
3156 	putnext(q, bp);
3157 
3158 	return (1);
3159 }
3160 
3161 int
3162 putnextctl(queue_t *q, int type)
3163 {
3164 	mblk_t *bp;
3165 
3166 	if ((datamsg(type) && (type != M_DELAY)) ||
3167 	    ((bp = allocb_tryhard(0)) == NULL))
3168 		return (0);
3169 	bp->b_datap->db_type = (unsigned char)type;
3170 
3171 	putnext(q, bp);
3172 
3173 	return (1);
3174 }
3175 
3176 /*
3177  * Return the queue upstream from this one
3178  */
3179 queue_t *
3180 backq(queue_t *q)
3181 {
3182 	q = _OTHERQ(q);
3183 	if (q->q_next) {
3184 		q = q->q_next;
3185 		return (_OTHERQ(q));
3186 	}
3187 	return (NULL);
3188 }
3189 
3190 /*
3191  * Send a block back up the queue in reverse from this
3192  * one (e.g. to respond to ioctls)
3193  */
3194 void
3195 qreply(queue_t *q, mblk_t *bp)
3196 {
3197 	ASSERT(q && bp);
3198 
3199 	putnext(_OTHERQ(q), bp);
3200 }
3201 
3202 /*
3203  * Streams Queue Scheduling
3204  *
3205  * Queues are enabled through qenable() when they have messages to
3206  * process.  They are serviced by queuerun(), which runs each enabled
3207  * queue's service procedure.  The call to queuerun() is processor
3208  * dependent - the general principle is that it be run whenever a queue
3209  * is enabled but before returning to user level.  For system calls,
3210  * the function runqueues() is called if their action causes a queue
3211  * to be enabled.  For device interrupts, queuerun() should be
3212  * called before returning from the last level of interrupt.  Beyond
3213  * this, no timing assumptions should be made about queue scheduling.
3214  */
3215 
3216 /*
3217  * Enable a queue: put it on list of those whose service procedures are
3218  * ready to run and set up the scheduling mechanism.
3219  * The broadcast is done outside the mutex -> to avoid the woken thread
3220  * from contending with the mutex. This is OK 'cos the queue has been
3221  * enqueued on the runlist and flagged safely at this point.
3222  */
3223 void
3224 qenable(queue_t *q)
3225 {
3226 	mutex_enter(QLOCK(q));
3227 	qenable_locked(q);
3228 	mutex_exit(QLOCK(q));
3229 }
3230 /*
3231  * Return number of messages on queue
3232  */
3233 int
3234 qsize(queue_t *qp)
3235 {
3236 	int count = 0;
3237 	mblk_t *mp;
3238 
3239 	mutex_enter(QLOCK(qp));
3240 	for (mp = qp->q_first; mp; mp = mp->b_next)
3241 		count++;
3242 	mutex_exit(QLOCK(qp));
3243 	return (count);
3244 }
3245 
3246 /*
3247  * noenable - set queue so that putq() will not enable it.
3248  * enableok - set queue so that putq() can enable it.
3249  */
3250 void
3251 noenable(queue_t *q)
3252 {
3253 	mutex_enter(QLOCK(q));
3254 	q->q_flag |= QNOENB;
3255 	mutex_exit(QLOCK(q));
3256 }
3257 
3258 void
3259 enableok(queue_t *q)
3260 {
3261 	mutex_enter(QLOCK(q));
3262 	q->q_flag &= ~QNOENB;
3263 	mutex_exit(QLOCK(q));
3264 }
3265 
3266 /*
3267  * Set queue fields.
3268  */
3269 int
3270 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3271 {
3272 	qband_t *qbp = NULL;
3273 	queue_t	*wrq;
3274 	int error = 0;
3275 	kthread_id_t freezer;
3276 
3277 	freezer = STREAM(q)->sd_freezer;
3278 	if (freezer == curthread) {
3279 		ASSERT(frozenstr(q));
3280 		ASSERT(MUTEX_HELD(QLOCK(q)));
3281 	} else
3282 		mutex_enter(QLOCK(q));
3283 
3284 	if (what >= QBAD) {
3285 		error = EINVAL;
3286 		goto done;
3287 	}
3288 	if (pri != 0) {
3289 		int i;
3290 		qband_t **qbpp;
3291 
3292 		if (pri > q->q_nband) {
3293 			qbpp = &q->q_bandp;
3294 			while (*qbpp)
3295 				qbpp = &(*qbpp)->qb_next;
3296 			while (pri > q->q_nband) {
3297 				if ((*qbpp = allocband()) == NULL) {
3298 					error = EAGAIN;
3299 					goto done;
3300 				}
3301 				(*qbpp)->qb_hiwat = q->q_hiwat;
3302 				(*qbpp)->qb_lowat = q->q_lowat;
3303 				q->q_nband++;
3304 				qbpp = &(*qbpp)->qb_next;
3305 			}
3306 		}
3307 		qbp = q->q_bandp;
3308 		i = pri;
3309 		while (--i)
3310 			qbp = qbp->qb_next;
3311 	}
3312 	switch (what) {
3313 
3314 	case QHIWAT:
3315 		if (qbp)
3316 			qbp->qb_hiwat = (size_t)val;
3317 		else
3318 			q->q_hiwat = (size_t)val;
3319 		break;
3320 
3321 	case QLOWAT:
3322 		if (qbp)
3323 			qbp->qb_lowat = (size_t)val;
3324 		else
3325 			q->q_lowat = (size_t)val;
3326 		break;
3327 
3328 	case QMAXPSZ:
3329 		if (qbp)
3330 			error = EINVAL;
3331 		else
3332 			q->q_maxpsz = (ssize_t)val;
3333 
3334 		/*
3335 		 * Performance concern, strwrite looks at the module below
3336 		 * the stream head for the maxpsz each time it does a write
3337 		 * we now cache it at the stream head.  Check to see if this
3338 		 * queue is sitting directly below the stream head.
3339 		 */
3340 		wrq = STREAM(q)->sd_wrq;
3341 		if (q != wrq->q_next)
3342 			break;
3343 
3344 		/*
3345 		 * If the stream is not frozen drop the current QLOCK and
3346 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3347 		 */
3348 		if (freezer != curthread) {
3349 			mutex_exit(QLOCK(q));
3350 			mutex_enter(QLOCK(wrq));
3351 		}
3352 		ASSERT(MUTEX_HELD(QLOCK(wrq)));
3353 
3354 		if (strmsgsz != 0) {
3355 			if (val == INFPSZ)
3356 				val = strmsgsz;
3357 			else  {
3358 				if (STREAM(q)->sd_vnode->v_type == VFIFO)
3359 					val = MIN(PIPE_BUF, val);
3360 				else
3361 					val = MIN(strmsgsz, val);
3362 			}
3363 		}
3364 		STREAM(q)->sd_qn_maxpsz = val;
3365 		if (freezer != curthread) {
3366 			mutex_exit(QLOCK(wrq));
3367 			mutex_enter(QLOCK(q));
3368 		}
3369 		break;
3370 
3371 	case QMINPSZ:
3372 		if (qbp)
3373 			error = EINVAL;
3374 		else
3375 			q->q_minpsz = (ssize_t)val;
3376 
3377 		/*
3378 		 * Performance concern, strwrite looks at the module below
3379 		 * the stream head for the maxpsz each time it does a write
3380 		 * we now cache it at the stream head.  Check to see if this
3381 		 * queue is sitting directly below the stream head.
3382 		 */
3383 		wrq = STREAM(q)->sd_wrq;
3384 		if (q != wrq->q_next)
3385 			break;
3386 
3387 		/*
3388 		 * If the stream is not frozen drop the current QLOCK and
3389 		 * acquire the sd_wrq QLOCK which protects sd_qn_*
3390 		 */
3391 		if (freezer != curthread) {
3392 			mutex_exit(QLOCK(q));
3393 			mutex_enter(QLOCK(wrq));
3394 		}
3395 		STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3396 
3397 		if (freezer != curthread) {
3398 			mutex_exit(QLOCK(wrq));
3399 			mutex_enter(QLOCK(q));
3400 		}
3401 		break;
3402 
3403 	case QSTRUIOT:
3404 		if (qbp)
3405 			error = EINVAL;
3406 		else
3407 			q->q_struiot = (ushort_t)val;
3408 		break;
3409 
3410 	case QCOUNT:
3411 	case QFIRST:
3412 	case QLAST:
3413 	case QFLAG:
3414 		error = EPERM;
3415 		break;
3416 
3417 	default:
3418 		error = EINVAL;
3419 		break;
3420 	}
3421 done:
3422 	if (freezer != curthread)
3423 		mutex_exit(QLOCK(q));
3424 	return (error);
3425 }
3426 
3427 /*
3428  * Get queue fields.
3429  */
3430 int
3431 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3432 {
3433 	qband_t 	*qbp = NULL;
3434 	int 		error = 0;
3435 	kthread_id_t 	freezer;
3436 
3437 	freezer = STREAM(q)->sd_freezer;
3438 	if (freezer == curthread) {
3439 		ASSERT(frozenstr(q));
3440 		ASSERT(MUTEX_HELD(QLOCK(q)));
3441 	} else
3442 		mutex_enter(QLOCK(q));
3443 	if (what >= QBAD) {
3444 		error = EINVAL;
3445 		goto done;
3446 	}
3447 	if (pri != 0) {
3448 		int i;
3449 		qband_t **qbpp;
3450 
3451 		if (pri > q->q_nband) {
3452 			qbpp = &q->q_bandp;
3453 			while (*qbpp)
3454 				qbpp = &(*qbpp)->qb_next;
3455 			while (pri > q->q_nband) {
3456 				if ((*qbpp = allocband()) == NULL) {
3457 					error = EAGAIN;
3458 					goto done;
3459 				}
3460 				(*qbpp)->qb_hiwat = q->q_hiwat;
3461 				(*qbpp)->qb_lowat = q->q_lowat;
3462 				q->q_nband++;
3463 				qbpp = &(*qbpp)->qb_next;
3464 			}
3465 		}
3466 		qbp = q->q_bandp;
3467 		i = pri;
3468 		while (--i)
3469 			qbp = qbp->qb_next;
3470 	}
3471 	switch (what) {
3472 	case QHIWAT:
3473 		if (qbp)
3474 			*(size_t *)valp = qbp->qb_hiwat;
3475 		else
3476 			*(size_t *)valp = q->q_hiwat;
3477 		break;
3478 
3479 	case QLOWAT:
3480 		if (qbp)
3481 			*(size_t *)valp = qbp->qb_lowat;
3482 		else
3483 			*(size_t *)valp = q->q_lowat;
3484 		break;
3485 
3486 	case QMAXPSZ:
3487 		if (qbp)
3488 			error = EINVAL;
3489 		else
3490 			*(ssize_t *)valp = q->q_maxpsz;
3491 		break;
3492 
3493 	case QMINPSZ:
3494 		if (qbp)
3495 			error = EINVAL;
3496 		else
3497 			*(ssize_t *)valp = q->q_minpsz;
3498 		break;
3499 
3500 	case QCOUNT:
3501 		if (qbp)
3502 			*(size_t *)valp = qbp->qb_count;
3503 		else
3504 			*(size_t *)valp = q->q_count;
3505 		break;
3506 
3507 	case QFIRST:
3508 		if (qbp)
3509 			*(mblk_t **)valp = qbp->qb_first;
3510 		else
3511 			*(mblk_t **)valp = q->q_first;
3512 		break;
3513 
3514 	case QLAST:
3515 		if (qbp)
3516 			*(mblk_t **)valp = qbp->qb_last;
3517 		else
3518 			*(mblk_t **)valp = q->q_last;
3519 		break;
3520 
3521 	case QFLAG:
3522 		if (qbp)
3523 			*(uint_t *)valp = qbp->qb_flag;
3524 		else
3525 			*(uint_t *)valp = q->q_flag;
3526 		break;
3527 
3528 	case QSTRUIOT:
3529 		if (qbp)
3530 			error = EINVAL;
3531 		else
3532 			*(short *)valp = q->q_struiot;
3533 		break;
3534 
3535 	default:
3536 		error = EINVAL;
3537 		break;
3538 	}
3539 done:
3540 	if (freezer != curthread)
3541 		mutex_exit(QLOCK(q));
3542 	return (error);
3543 }
3544 
3545 /*
3546  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3547  *	QWANTWSYNC or QWANTR or QWANTW,
3548  *
3549  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3550  *	 deferred wakeup will be done. Also if strpoll() in progress then a
3551  *	 deferred pollwakeup will be done.
3552  */
3553 void
3554 strwakeq(queue_t *q, int flag)
3555 {
3556 	stdata_t 	*stp = STREAM(q);
3557 	pollhead_t 	*pl;
3558 
3559 	mutex_enter(&stp->sd_lock);
3560 	pl = &stp->sd_pollist;
3561 	if (flag & QWANTWSYNC) {
3562 		ASSERT(!(q->q_flag & QREADR));
3563 		if (stp->sd_flag & WSLEEP) {
3564 			stp->sd_flag &= ~WSLEEP;
3565 			cv_broadcast(&stp->sd_wrq->q_wait);
3566 		} else {
3567 			stp->sd_wakeq |= WSLEEP;
3568 		}
3569 
3570 		mutex_exit(&stp->sd_lock);
3571 		pollwakeup(pl, POLLWRNORM);
3572 		mutex_enter(&stp->sd_lock);
3573 
3574 		if (stp->sd_sigflags & S_WRNORM)
3575 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3576 	} else if (flag & QWANTR) {
3577 		if (stp->sd_flag & RSLEEP) {
3578 			stp->sd_flag &= ~RSLEEP;
3579 			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3580 		} else {
3581 			stp->sd_wakeq |= RSLEEP;
3582 		}
3583 
3584 		mutex_exit(&stp->sd_lock);
3585 		pollwakeup(pl, POLLIN | POLLRDNORM);
3586 		mutex_enter(&stp->sd_lock);
3587 
3588 		{
3589 			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3590 
3591 			if (events)
3592 				strsendsig(stp->sd_siglist, events, 0, 0);
3593 		}
3594 	} else {
3595 		if (stp->sd_flag & WSLEEP) {
3596 			stp->sd_flag &= ~WSLEEP;
3597 			cv_broadcast(&stp->sd_wrq->q_wait);
3598 		}
3599 
3600 		mutex_exit(&stp->sd_lock);
3601 		pollwakeup(pl, POLLWRNORM);
3602 		mutex_enter(&stp->sd_lock);
3603 
3604 		if (stp->sd_sigflags & S_WRNORM)
3605 			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3606 	}
3607 	mutex_exit(&stp->sd_lock);
3608 }
3609 
3610 int
3611 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3612 {
3613 	stdata_t *stp = STREAM(q);
3614 	int typ  = STRUIOT_STANDARD;
3615 	uio_t	 *uiop = &dp->d_uio;
3616 	dblk_t	 *dbp;
3617 	ssize_t	 uiocnt;
3618 	ssize_t	 cnt;
3619 	unsigned char *ptr;
3620 	ssize_t	 resid;
3621 	int	 error = 0;
3622 	on_trap_data_t otd;
3623 	queue_t	*stwrq;
3624 
3625 	/*
3626 	 * Plumbing may change while taking the type so store the
3627 	 * queue in a temporary variable. It doesn't matter even
3628 	 * if the we take the type from the previous plumbing,
3629 	 * that's because if the plumbing has changed when we were
3630 	 * holding the queue in a temporary variable, we can continue
3631 	 * processing the message the way it would have been processed
3632 	 * in the old plumbing, without any side effects but a bit
3633 	 * extra processing for partial ip header checksum.
3634 	 *
3635 	 * This has been done to avoid holding the sd_lock which is
3636 	 * very hot.
3637 	 */
3638 
3639 	stwrq = stp->sd_struiowrq;
3640 	if (stwrq)
3641 		typ = stwrq->q_struiot;
3642 
3643 	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3644 		dbp = mp->b_datap;
3645 		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3646 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3647 		cnt = MIN(uiocnt, uiop->uio_resid);
3648 		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3649 		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3650 			/*
3651 			 * Either this mblk has already been processed
3652 			 * or there is no more room in this mblk (?).
3653 			 */
3654 			continue;
3655 		}
3656 		switch (typ) {
3657 		case STRUIOT_STANDARD:
3658 			if (noblock) {
3659 				if (on_trap(&otd, OT_DATA_ACCESS)) {
3660 					no_trap();
3661 					error = EWOULDBLOCK;
3662 					goto out;
3663 				}
3664 			}
3665 			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3666 				if (noblock)
3667 					no_trap();
3668 				goto out;
3669 			}
3670 			if (noblock)
3671 				no_trap();
3672 			break;
3673 
3674 		default:
3675 			error = EIO;
3676 			goto out;
3677 		}
3678 		dbp->db_struioflag |= STRUIO_DONE;
3679 		dbp->db_cksumstuff += cnt;
3680 	}
3681 out:
3682 	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3683 		/*
3684 		 * A fault has occured and some bytes were moved to the
3685 		 * current mblk, the uio_t has already been updated by
3686 		 * the appropriate uio routine, so also update the mblk
3687 		 * to reflect this in case this same mblk chain is used
3688 		 * again (after the fault has been handled).
3689 		 */
3690 		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3691 		if (uiocnt >= resid)
3692 			dbp->db_cksumstuff += resid;
3693 	}
3694 	return (error);
3695 }
3696 
3697 /*
3698  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3699  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3700  * that removeq() will not try to close the queue while a thread is inside the
3701  * queue.
3702  */
3703 static boolean_t
3704 rwnext_enter(queue_t *qp)
3705 {
3706 	mutex_enter(QLOCK(qp));
3707 	if (qp->q_flag & QWCLOSE) {
3708 		mutex_exit(QLOCK(qp));
3709 		return (B_FALSE);
3710 	}
3711 	qp->q_rwcnt++;
3712 	ASSERT(qp->q_rwcnt != 0);
3713 	mutex_exit(QLOCK(qp));
3714 	return (B_TRUE);
3715 }
3716 
3717 /*
3718  * Decrease the count of threads running in sync stream queue and wake up any
3719  * threads blocked in removeq().
3720  */
3721 static void
3722 rwnext_exit(queue_t *qp)
3723 {
3724 	mutex_enter(QLOCK(qp));
3725 	qp->q_rwcnt--;
3726 	if (qp->q_flag & QWANTRMQSYNC) {
3727 		qp->q_flag &= ~QWANTRMQSYNC;
3728 		cv_broadcast(&qp->q_wait);
3729 	}
3730 	mutex_exit(QLOCK(qp));
3731 }
3732 
3733 /*
3734  * The purpose of rwnext() is to call the rw procedure of the next
3735  * (downstream) modules queue.
3736  *
3737  * treated as put entrypoint for perimeter syncronization.
3738  *
3739  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3740  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3741  * not matter if any regular put entrypoints have been already entered. We
3742  * can't increment one of the sq_putcounts (instead of sq_count) because
3743  * qwait_rw won't know which counter to decrement.
3744  *
3745  * It would be reasonable to add the lockless FASTPUT logic.
3746  */
3747 int
3748 rwnext(queue_t *qp, struiod_t *dp)
3749 {
3750 	queue_t		*nqp;
3751 	syncq_t		*sq;
3752 	uint16_t	count;
3753 	uint16_t	flags;
3754 	struct qinit	*qi;
3755 	int		(*proc)();
3756 	struct stdata	*stp;
3757 	int		isread;
3758 	int		rval;
3759 
3760 	stp = STREAM(qp);
3761 	/*
3762 	 * Prevent q_next from changing by holding sd_lock until acquiring
3763 	 * SQLOCK. Note that a read-side rwnext from the streamhead will
3764 	 * already have sd_lock acquired. In either case sd_lock is always
3765 	 * released after acquiring SQLOCK.
3766 	 *
3767 	 * The streamhead read-side holding sd_lock when calling rwnext is
3768 	 * required to prevent a race condition were M_DATA mblks flowing
3769 	 * up the read-side of the stream could be bypassed by a rwnext()
3770 	 * down-call. In this case sd_lock acts as the streamhead perimeter.
3771 	 */
3772 	if ((nqp = _WR(qp)) == qp) {
3773 		isread = 0;
3774 		mutex_enter(&stp->sd_lock);
3775 		qp = nqp->q_next;
3776 	} else {
3777 		isread = 1;
3778 		if (nqp != stp->sd_wrq)
3779 			/* Not streamhead */
3780 			mutex_enter(&stp->sd_lock);
3781 		qp = _RD(nqp->q_next);
3782 	}
3783 	qi = qp->q_qinfo;
3784 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3785 		/*
3786 		 * Not a synchronous module or no r/w procedure for this
3787 		 * queue, so just return EINVAL and let the caller handle it.
3788 		 */
3789 		mutex_exit(&stp->sd_lock);
3790 		return (EINVAL);
3791 	}
3792 
3793 	if (rwnext_enter(qp) == B_FALSE) {
3794 		mutex_exit(&stp->sd_lock);
3795 		return (EINVAL);
3796 	}
3797 
3798 	sq = qp->q_syncq;
3799 	mutex_enter(SQLOCK(sq));
3800 	mutex_exit(&stp->sd_lock);
3801 	count = sq->sq_count;
3802 	flags = sq->sq_flags;
3803 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3804 
3805 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3806 		/*
3807 		 * if this queue is being closed, return.
3808 		 */
3809 		if (qp->q_flag & QWCLOSE) {
3810 			mutex_exit(SQLOCK(sq));
3811 			rwnext_exit(qp);
3812 			return (EINVAL);
3813 		}
3814 
3815 		/*
3816 		 * Wait until we can enter the inner perimeter.
3817 		 */
3818 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3819 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3820 		count = sq->sq_count;
3821 		flags = sq->sq_flags;
3822 	}
3823 
3824 	if (isread == 0 && stp->sd_struiowrq == NULL ||
3825 	    isread == 1 && stp->sd_struiordq == NULL) {
3826 		/*
3827 		 * Stream plumbing changed while waiting for inner perimeter
3828 		 * so just return EINVAL and let the caller handle it.
3829 		 */
3830 		mutex_exit(SQLOCK(sq));
3831 		rwnext_exit(qp);
3832 		return (EINVAL);
3833 	}
3834 	if (!(flags & SQ_CIPUT))
3835 		sq->sq_flags = flags | SQ_EXCL;
3836 	sq->sq_count = count + 1;
3837 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3838 	/*
3839 	 * Note: The only message ordering guarantee that rwnext() makes is
3840 	 *	 for the write queue flow-control case. All others (r/w queue
3841 	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
3842 	 *	 the queue's rw procedure. This could be genralized here buy
3843 	 *	 running the queue's service procedure, but that wouldn't be
3844 	 *	 the most efficent for all cases.
3845 	 */
3846 	mutex_exit(SQLOCK(sq));
3847 	if (! isread && (qp->q_flag & QFULL)) {
3848 		/*
3849 		 * Write queue may be flow controlled. If so,
3850 		 * mark the queue for wakeup when it's not.
3851 		 */
3852 		mutex_enter(QLOCK(qp));
3853 		if (qp->q_flag & QFULL) {
3854 			qp->q_flag |= QWANTWSYNC;
3855 			mutex_exit(QLOCK(qp));
3856 			rval = EWOULDBLOCK;
3857 			goto out;
3858 		}
3859 		mutex_exit(QLOCK(qp));
3860 	}
3861 
3862 	if (! isread && dp->d_mp)
3863 		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3864 		    dp->d_mp->b_datap->db_base);
3865 
3866 	rval = (*proc)(qp, dp);
3867 
3868 	if (isread && dp->d_mp)
3869 		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3870 		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3871 out:
3872 	/*
3873 	 * The queue is protected from being freed by sq_count, so it is
3874 	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3875 	 */
3876 	rwnext_exit(qp);
3877 
3878 	mutex_enter(SQLOCK(sq));
3879 	flags = sq->sq_flags;
3880 	ASSERT(sq->sq_count != 0);
3881 	sq->sq_count--;
3882 	if (flags & SQ_TAIL) {
3883 		putnext_tail(sq, qp, flags);
3884 		/*
3885 		 * The only purpose of this ASSERT is to preserve calling stack
3886 		 * in DEBUG kernel.
3887 		 */
3888 		ASSERT(flags & SQ_TAIL);
3889 		return (rval);
3890 	}
3891 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3892 	/*
3893 	 * Safe to always drop SQ_EXCL:
3894 	 *	Not SQ_CIPUT means we set SQ_EXCL above
3895 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3896 	 *	did a qwriter(INNER) in which case nobody else
3897 	 *	is in the inner perimeter and we are exiting.
3898 	 *
3899 	 * I would like to make the following assertion:
3900 	 *
3901 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3902 	 * 	sq->sq_count == 0);
3903 	 *
3904 	 * which indicates that if we are both putshared and exclusive,
3905 	 * we became exclusive while executing the putproc, and the only
3906 	 * claim on the syncq was the one we dropped a few lines above.
3907 	 * But other threads that enter putnext while the syncq is exclusive
3908 	 * need to make a claim as they may need to drop SQLOCK in the
3909 	 * has_writers case to avoid deadlocks.  If these threads are
3910 	 * delayed or preempted, it is possible that the writer thread can
3911 	 * find out that there are other claims making the (sq_count == 0)
3912 	 * test invalid.
3913 	 */
3914 
3915 	sq->sq_flags = flags & ~SQ_EXCL;
3916 	if (sq->sq_flags & SQ_WANTWAKEUP) {
3917 		sq->sq_flags &= ~SQ_WANTWAKEUP;
3918 		cv_broadcast(&sq->sq_wait);
3919 	}
3920 	mutex_exit(SQLOCK(sq));
3921 	return (rval);
3922 }
3923 
3924 /*
3925  * The purpose of infonext() is to call the info procedure of the next
3926  * (downstream) modules queue.
3927  *
3928  * treated as put entrypoint for perimeter syncronization.
3929  *
3930  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3931  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3932  * it does not matter if any regular put entrypoints have been already
3933  * entered.
3934  */
3935 int
3936 infonext(queue_t *qp, infod_t *idp)
3937 {
3938 	queue_t		*nqp;
3939 	syncq_t		*sq;
3940 	uint16_t	count;
3941 	uint16_t 	flags;
3942 	struct qinit	*qi;
3943 	int		(*proc)();
3944 	struct stdata	*stp;
3945 	int		rval;
3946 
3947 	stp = STREAM(qp);
3948 	/*
3949 	 * Prevent q_next from changing by holding sd_lock until
3950 	 * acquiring SQLOCK.
3951 	 */
3952 	mutex_enter(&stp->sd_lock);
3953 	if ((nqp = _WR(qp)) == qp) {
3954 		qp = nqp->q_next;
3955 	} else {
3956 		qp = _RD(nqp->q_next);
3957 	}
3958 	qi = qp->q_qinfo;
3959 	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3960 		mutex_exit(&stp->sd_lock);
3961 		return (EINVAL);
3962 	}
3963 	sq = qp->q_syncq;
3964 	mutex_enter(SQLOCK(sq));
3965 	mutex_exit(&stp->sd_lock);
3966 	count = sq->sq_count;
3967 	flags = sq->sq_flags;
3968 	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3969 
3970 	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3971 		/*
3972 		 * Wait until we can enter the inner perimeter.
3973 		 */
3974 		sq->sq_flags = flags | SQ_WANTWAKEUP;
3975 		cv_wait(&sq->sq_wait, SQLOCK(sq));
3976 		count = sq->sq_count;
3977 		flags = sq->sq_flags;
3978 	}
3979 
3980 	if (! (flags & SQ_CIPUT))
3981 		sq->sq_flags = flags | SQ_EXCL;
3982 	sq->sq_count = count + 1;
3983 	ASSERT(sq->sq_count != 0);		/* Wraparound */
3984 	mutex_exit(SQLOCK(sq));
3985 
3986 	rval = (*proc)(qp, idp);
3987 
3988 	mutex_enter(SQLOCK(sq));
3989 	flags = sq->sq_flags;
3990 	ASSERT(sq->sq_count != 0);
3991 	sq->sq_count--;
3992 	if (flags & SQ_TAIL) {
3993 		putnext_tail(sq, qp, flags);
3994 		/*
3995 		 * The only purpose of this ASSERT is to preserve calling stack
3996 		 * in DEBUG kernel.
3997 		 */
3998 		ASSERT(flags & SQ_TAIL);
3999 		return (rval);
4000 	}
4001 	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
4002 /*
4003  * XXXX
4004  * I am not certain the next comment is correct here.  I need to consider
4005  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
4006  * might cause other problems.  It just might be safer to drop it if
4007  * !SQ_CIPUT because that is when we set it.
4008  */
4009 	/*
4010 	 * Safe to always drop SQ_EXCL:
4011 	 *	Not SQ_CIPUT means we set SQ_EXCL above
4012 	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4013 	 *	did a qwriter(INNER) in which case nobody else
4014 	 *	is in the inner perimeter and we are exiting.
4015 	 *
4016 	 * I would like to make the following assertion:
4017 	 *
4018 	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4019 	 *	sq->sq_count == 0);
4020 	 *
4021 	 * which indicates that if we are both putshared and exclusive,
4022 	 * we became exclusive while executing the putproc, and the only
4023 	 * claim on the syncq was the one we dropped a few lines above.
4024 	 * But other threads that enter putnext while the syncq is exclusive
4025 	 * need to make a claim as they may need to drop SQLOCK in the
4026 	 * has_writers case to avoid deadlocks.  If these threads are
4027 	 * delayed or preempted, it is possible that the writer thread can
4028 	 * find out that there are other claims making the (sq_count == 0)
4029 	 * test invalid.
4030 	 */
4031 
4032 	sq->sq_flags = flags & ~SQ_EXCL;
4033 	mutex_exit(SQLOCK(sq));
4034 	return (rval);
4035 }
4036 
4037 /*
4038  * Return nonzero if the queue is responsible for struio(), else return 0.
4039  */
4040 int
4041 isuioq(queue_t *q)
4042 {
4043 	if (q->q_flag & QREADR)
4044 		return (STREAM(q)->sd_struiordq == q);
4045 	else
4046 		return (STREAM(q)->sd_struiowrq == q);
4047 }
4048 
4049 #if defined(__sparc)
4050 int disable_putlocks = 0;
4051 #else
4052 int disable_putlocks = 1;
4053 #endif
4054 
4055 /*
4056  * called by create_putlock.
4057  */
4058 static void
4059 create_syncq_putlocks(queue_t *q)
4060 {
4061 	syncq_t	*sq = q->q_syncq;
4062 	ciputctrl_t *cip;
4063 	int i;
4064 
4065 	ASSERT(sq != NULL);
4066 
4067 	ASSERT(disable_putlocks == 0);
4068 	ASSERT(n_ciputctrl >= min_n_ciputctrl);
4069 	ASSERT(ciputctrl_cache != NULL);
4070 
4071 	if (!(sq->sq_type & SQ_CIPUT))
4072 		return;
4073 
4074 	for (i = 0; i <= 1; i++) {
4075 		if (sq->sq_ciputctrl == NULL) {
4076 			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4077 			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4078 			mutex_enter(SQLOCK(sq));
4079 			if (sq->sq_ciputctrl != NULL) {
4080 				mutex_exit(SQLOCK(sq));
4081 				kmem_cache_free(ciputctrl_cache, cip);
4082 			} else {
4083 				ASSERT(sq->sq_nciputctrl == 0);
4084 				sq->sq_nciputctrl = n_ciputctrl - 1;
4085 				/*
4086 				 * putnext checks sq_ciputctrl without holding
4087 				 * SQLOCK. if it is not NULL putnext assumes
4088 				 * sq_nciputctrl is initialized. membar below
4089 				 * insures that.
4090 				 */
4091 				membar_producer();
4092 				sq->sq_ciputctrl = cip;
4093 				mutex_exit(SQLOCK(sq));
4094 			}
4095 		}
4096 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4097 		if (i == 1)
4098 			break;
4099 		q = _OTHERQ(q);
4100 		if (!(q->q_flag & QPERQ)) {
4101 			ASSERT(sq == q->q_syncq);
4102 			break;
4103 		}
4104 		ASSERT(q->q_syncq != NULL);
4105 		ASSERT(sq != q->q_syncq);
4106 		sq = q->q_syncq;
4107 		ASSERT(sq->sq_type & SQ_CIPUT);
4108 	}
4109 }
4110 
4111 /*
4112  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4113  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4114  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4115  * starting from q and down to the driver.
4116  *
4117  * This should be called after the affected queues are part of stream
4118  * geometry. It should be called from driver/module open routine after
4119  * qprocson() call. It is also called from nfs syscall where it is known that
4120  * stream is configured and won't change its geometry during create_putlock
4121  * call.
4122  *
4123  * caller normally uses 0 value for the stream argument to speed up MT putnext
4124  * into the perimeter of q for example because its perimeter is per module
4125  * (e.g. IP).
4126  *
4127  * caller normally uses non 0 value for the stream argument to hint the system
4128  * that the stream of q is a very contended global system stream
4129  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4130  * particularly MT hot.
4131  *
4132  * Caller insures stream plumbing won't happen while we are here and therefore
4133  * q_next can be safely used.
4134  */
4135 
4136 void
4137 create_putlocks(queue_t *q, int stream)
4138 {
4139 	ciputctrl_t	*cip;
4140 	struct stdata	*stp = STREAM(q);
4141 
4142 	q = _WR(q);
4143 	ASSERT(stp != NULL);
4144 
4145 	if (disable_putlocks != 0)
4146 		return;
4147 
4148 	if (n_ciputctrl < min_n_ciputctrl)
4149 		return;
4150 
4151 	ASSERT(ciputctrl_cache != NULL);
4152 
4153 	if (stream != 0 && stp->sd_ciputctrl == NULL) {
4154 		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4155 		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4156 		mutex_enter(&stp->sd_lock);
4157 		if (stp->sd_ciputctrl != NULL) {
4158 			mutex_exit(&stp->sd_lock);
4159 			kmem_cache_free(ciputctrl_cache, cip);
4160 		} else {
4161 			ASSERT(stp->sd_nciputctrl == 0);
4162 			stp->sd_nciputctrl = n_ciputctrl - 1;
4163 			/*
4164 			 * putnext checks sd_ciputctrl without holding
4165 			 * sd_lock. if it is not NULL putnext assumes
4166 			 * sd_nciputctrl is initialized. membar below
4167 			 * insures that.
4168 			 */
4169 			membar_producer();
4170 			stp->sd_ciputctrl = cip;
4171 			mutex_exit(&stp->sd_lock);
4172 		}
4173 	}
4174 
4175 	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4176 
4177 	while (_SAMESTR(q)) {
4178 		create_syncq_putlocks(q);
4179 		if (stream == 0)
4180 			return;
4181 		q = q->q_next;
4182 	}
4183 	ASSERT(q != NULL);
4184 	create_syncq_putlocks(q);
4185 }
4186 
4187 /*
4188  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4189  * through a stream.
4190  *
4191  * Data currently record per-event is a timestamp, module/driver name,
4192  * downstream module/driver name, optional callstack, event type and a per
4193  * type datum.  Much of the STREAMS framework is instrumented for automatic
4194  * flow tracing (when enabled).  Events can be defined and used by STREAMS
4195  * modules and drivers.
4196  *
4197  * Global objects:
4198  *
4199  *	str_ftevent() - Add a flow-trace event to a dblk.
4200  *	str_ftfree() - Free flow-trace data
4201  *
4202  * Local objects:
4203  *
4204  *	fthdr_cache - pointer to the kmem cache for trace header.
4205  *	ftblk_cache - pointer to the kmem cache for trace data blocks.
4206  */
4207 
4208 int str_ftnever = 1;	/* Don't do STREAMS flow tracing */
4209 int str_ftstack = 0;	/* Don't record event call stacks */
4210 
4211 void
4212 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4213 {
4214 	ftblk_t *bp = hp->tail;
4215 	ftblk_t *nbp;
4216 	ftevnt_t *ep;
4217 	int ix, nix;
4218 
4219 	ASSERT(hp != NULL);
4220 
4221 	for (;;) {
4222 		if ((ix = bp->ix) == FTBLK_EVNTS) {
4223 			/*
4224 			 * Tail doesn't have room, so need a new tail.
4225 			 *
4226 			 * To make this MT safe, first, allocate a new
4227 			 * ftblk, and initialize it.  To make life a
4228 			 * little easier, reserve the first slot (mostly
4229 			 * by making ix = 1).  When we are finished with
4230 			 * the initialization, CAS this pointer to the
4231 			 * tail.  If this succeeds, this is the new
4232 			 * "next" block.  Otherwise, another thread
4233 			 * got here first, so free the block and start
4234 			 * again.
4235 			 */
4236 			nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4237 			if (nbp == NULL) {
4238 				/* no mem, so punt */
4239 				str_ftnever++;
4240 				/* free up all flow data? */
4241 				return;
4242 			}
4243 			nbp->nxt = NULL;
4244 			nbp->ix = 1;
4245 			/*
4246 			 * Just in case there is another thread about
4247 			 * to get the next index, we need to make sure
4248 			 * the value is there for it.
4249 			 */
4250 			membar_producer();
4251 			if (casptr(&hp->tail, bp, nbp) == bp) {
4252 				/* CAS was successful */
4253 				bp->nxt = nbp;
4254 				membar_producer();
4255 				bp = nbp;
4256 				ix = 0;
4257 				goto cas_good;
4258 			} else {
4259 				kmem_cache_free(ftblk_cache, nbp);
4260 				bp = hp->tail;
4261 				continue;
4262 			}
4263 		}
4264 		nix = ix + 1;
4265 		if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
4266 		cas_good:
4267 			if (curthread != hp->thread) {
4268 				hp->thread = curthread;
4269 				evnt |= FTEV_CS;
4270 			}
4271 			if (CPU->cpu_seqid != hp->cpu_seqid) {
4272 				hp->cpu_seqid = CPU->cpu_seqid;
4273 				evnt |= FTEV_PS;
4274 			}
4275 			ep = &bp->ev[ix];
4276 			break;
4277 		}
4278 	}
4279 
4280 	if (evnt & FTEV_QMASK) {
4281 		queue_t *qp = p;
4282 
4283 		if (!(qp->q_flag & QREADR))
4284 			evnt |= FTEV_ISWR;
4285 
4286 		ep->mid = Q2NAME(qp);
4287 
4288 		/*
4289 		 * We only record the next queue name for FTEV_PUTNEXT since
4290 		 * that's the only time we *really* need it, and the putnext()
4291 		 * code ensures that qp->q_next won't vanish.  (We could use
4292 		 * claimstr()/releasestr() but at a performance cost.)
4293 		 */
4294 		if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4295 			ep->midnext = Q2NAME(qp->q_next);
4296 		else
4297 			ep->midnext = NULL;
4298 	} else {
4299 		ep->mid = p;
4300 		ep->midnext = NULL;
4301 	}
4302 
4303 	if (ep->stk != NULL)
4304 		ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4305 
4306 	ep->ts = gethrtime();
4307 	ep->evnt = evnt;
4308 	ep->data = data;
4309 	hp->hash = (hp->hash << 9) + hp->hash;
4310 	hp->hash += (evnt << 16) | data;
4311 	hp->hash += (uintptr_t)ep->mid;
4312 }
4313 
4314 /*
4315  * Free flow-trace data.
4316  */
4317 void
4318 str_ftfree(dblk_t *dbp)
4319 {
4320 	fthdr_t *hp = dbp->db_fthdr;
4321 	ftblk_t *bp = &hp->first;
4322 	ftblk_t *nbp;
4323 
4324 	if (bp != hp->tail || bp->ix != 0) {
4325 		/*
4326 		 * Clear out the hash, have the tail point to itself, and free
4327 		 * any continuation blocks.
4328 		 */
4329 		bp = hp->first.nxt;
4330 		hp->tail = &hp->first;
4331 		hp->hash = 0;
4332 		hp->first.nxt = NULL;
4333 		hp->first.ix = 0;
4334 		while (bp != NULL) {
4335 			nbp = bp->nxt;
4336 			kmem_cache_free(ftblk_cache, bp);
4337 			bp = nbp;
4338 		}
4339 	}
4340 	kmem_cache_free(fthdr_cache, hp);
4341 	dbp->db_fthdr = NULL;
4342 }
4343