xref: /illumos-gate/usr/src/uts/common/inet/squeue.c (revision 8e268185036f404515ce8bc575423ac390136e3f)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
23  */
24 
25 /*
26  * Copyright 2012 Joyent, Inc.  All rights reserved.
27  */
28 
29 /*
30  * Squeues: General purpose serialization mechanism
31  * ------------------------------------------------
32  *
33  * Background:
34  * -----------
35  *
36  * This is a general purpose high-performance serialization mechanism
37  * currently used by TCP/IP. It is implement by means of a per CPU queue,
38  * a worker thread and a polling thread with are bound to the CPU
39  * associated with the squeue. The squeue is strictly FIFO for both read
40  * and write side and only one thread can process it at any given time.
41  * The design goal of squeue was to offer a very high degree of
42  * parallelization (on a per H/W execution pipeline basis) with at
43  * most one queuing.
44  *
45  * The modules needing protection typically calls SQUEUE_ENTER_ONE() or
46  * SQUEUE_ENTER() macro as soon as a thread enter the module
47  * from either direction. For each packet, the processing function
48  * and argument is stored in the mblk itself. When the packet is ready
49  * to be processed, the squeue retrieves the stored function and calls
50  * it with the supplied argument and the pointer to the packet itself.
51  * The called function can assume that no other thread is processing
52  * the squeue when it is executing.
53  *
54  * Squeue/connection binding:
55  * --------------------------
56  *
57  * TCP/IP uses an IP classifier in conjunction with squeue where specific
58  * connections are assigned to specific squeue (based on various policies),
59  * at the connection creation time. Once assigned, the connection to
60  * squeue mapping is never changed and all future packets for that
61  * connection are processed on that squeue. The connection ("conn") to
62  * squeue mapping is stored in "conn_t" member "conn_sqp".
63  *
64  * Since the processing of the connection cuts across multiple layers
65  * but still allows packets for different connnection to be processed on
66  * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or
67  * "Per Connection Vertical Perimeter".
68  *
69  * Processing Model:
70  * -----------------
71  *
72  * Squeue doesn't necessary processes packets with its own worker thread.
73  * The callers can pick if they just want to queue the packet, process
74  * their packet if nothing is queued or drain and process. The first two
75  * modes are typically employed when the packet was generated while
76  * already doing the processing behind the squeue and last mode (drain
77  * and process) is typically employed when the thread is entering squeue
78  * for the first time. The squeue still imposes a finite time limit
79  * for which a external thread can do processing after which it switches
80  * processing to its own worker thread.
81  *
82  * Once created, squeues are never deleted. Hence squeue pointers are
83  * always valid. This means that functions outside the squeue can still
84  * refer safely to conn_sqp and their is no need for ref counts.
85  *
86  * Only a thread executing in the squeue can change the squeue of the
87  * connection. It does so by calling a squeue framework function to do this.
88  * After changing the squeue, the thread must leave the squeue. It must not
89  * continue to execute any code that needs squeue protection.
90  *
91  * The squeue framework, after entering the squeue, checks if the current
92  * squeue matches the conn_sqp. If the check fails, the packet is delivered
93  * to right squeue.
94  *
95  * Polling Model:
96  * --------------
97  *
98  * Squeues can control the rate of packet arrival into itself from the
99  * NIC or specific Rx ring within a NIC. As part of capability negotiation
100  * between IP and MAC layer, squeue are created for each TCP soft ring
101  * (or TCP Rx ring - to be implemented in future). As part of this
102  * negotiation, squeues get a cookie for underlying soft ring or Rx
103  * ring, a function to turn off incoming packets and a function to call
104  * to poll for packets. This helps schedule the receive side packet
105  * processing so that queue backlog doesn't build up and packet processing
106  * doesn't keep getting disturbed by high priority interrupts. As part
107  * of this mode, as soon as a backlog starts building, squeue turns off
108  * the interrupts and switches to poll mode. In poll mode, when poll
109  * thread goes down to retrieve packets, it retrieves them in the form of
110  * a chain which improves performance even more. As the squeue/softring
111  * system gets more packets, it gets more efficient by switching to
112  * polling more often and dealing with larger packet chains.
113  *
114  */
115 
116 #include <sys/types.h>
117 #include <sys/cmn_err.h>
118 #include <sys/debug.h>
119 #include <sys/kmem.h>
120 #include <sys/cpuvar.h>
121 #include <sys/condvar_impl.h>
122 #include <sys/systm.h>
123 #include <sys/callb.h>
124 #include <sys/sdt.h>
125 #include <sys/ddi.h>
126 #include <sys/sunddi.h>
127 #include <sys/stack.h>
128 #include <sys/archsystm.h>
129 
130 #include <inet/ipclassifier.h>
131 #include <inet/udp_impl.h>
132 
133 #include <sys/squeue_impl.h>
134 
135 static void squeue_fire(void *);
136 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
137 static void squeue_worker(squeue_t *sqp);
138 static void squeue_polling_thread(squeue_t *sqp);
139 
140 kmem_cache_t *squeue_cache;
141 
142 #define	SQUEUE_MSEC_TO_NSEC 1000000
143 
144 int squeue_drain_ms = 20;
145 int squeue_workerwait_ms = 0;
146 
147 /* The values above converted to ticks or nano seconds */
148 static int squeue_drain_ns = 0;
149 static int squeue_workerwait_tick = 0;
150 
151 uintptr_t squeue_drain_stack_needed = 10240;
152 uint_t squeue_drain_stack_toodeep;
153 
154 #define	MAX_BYTES_TO_PICKUP	150000
155 
156 #define	ENQUEUE_CHAIN(sqp, mp, tail, cnt) {			\
157 	/*							\
158 	 * Enqueue our mblk chain.				\
159 	 */							\
160 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
161 								\
162 	if ((sqp)->sq_last != NULL)				\
163 		(sqp)->sq_last->b_next = (mp);			\
164 	else							\
165 		(sqp)->sq_first = (mp);				\
166 	(sqp)->sq_last = (tail);				\
167 	(sqp)->sq_count += (cnt);				\
168 	ASSERT((sqp)->sq_count > 0);				\
169 	DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp,	\
170 		mblk_t *, mp, mblk_t *, tail, int, cnt);	\
171 								\
172 }
173 
174 /*
175  * Blank the receive ring (in this case it is the soft ring). When
176  * blanked, the soft ring will not send any more packets up.
177  * Blanking may not succeed when there is a CPU already in the soft
178  * ring sending packets up. In that case, SQS_POLLING will not be
179  * set.
180  */
181 #define	SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) {		\
182 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
183 	if (sq_poll_capable) {					\
184 		ASSERT(rx_ring != NULL);			\
185 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
186 		if (!(sqp->sq_state & SQS_POLLING)) {		\
187 			if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \
188 				sqp->sq_state |= SQS_POLLING;	\
189 		}						\
190 	}							\
191 }
192 
193 #define	SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) {	\
194 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
195 	if (sq_poll_capable) {					\
196 		ASSERT(rx_ring != NULL);			\
197 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
198 		if (sqp->sq_state & SQS_POLLING) {		\
199 			sqp->sq_state &= ~SQS_POLLING;		\
200 			rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \
201 		}						\
202 	}							\
203 }
204 
205 /* Wakeup poll thread only if SQS_POLLING is set */
206 #define	SQS_POLL_RING(sqp) {			\
207 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
208 	if (sqp->sq_state & SQS_POLLING) {			\
209 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
210 		if (!(sqp->sq_state & SQS_GET_PKTS)) {		\
211 			sqp->sq_state |= SQS_GET_PKTS;		\
212 			cv_signal(&sqp->sq_poll_cv);		\
213 		}						\
214 	}							\
215 }
216 
217 #ifdef DEBUG
218 #define	SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) {		\
219 	(sqp)->sq_curmp = (mp);					\
220 	(sqp)->sq_curproc = (proc);				\
221 	(sqp)->sq_connp = (connp);				\
222 	(mp)->b_tag = (sqp)->sq_tag = (tag);			\
223 }
224 
225 #define	SQUEUE_DBG_CLEAR(sqp)	{				\
226 	(sqp)->sq_curmp = NULL;					\
227 	(sqp)->sq_curproc = NULL;				\
228 	(sqp)->sq_connp = NULL;					\
229 }
230 #else
231 #define	SQUEUE_DBG_SET(sqp, mp, proc, connp, tag)
232 #define	SQUEUE_DBG_CLEAR(sqp)
233 #endif
234 
235 void
236 squeue_init(void)
237 {
238 	squeue_cache = kmem_cache_create("squeue_cache",
239 	    sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
240 
241 	squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC;
242 	squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms);
243 }
244 
245 /* ARGSUSED */
246 squeue_t *
247 squeue_create(clock_t wait, pri_t pri)
248 {
249 	squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
250 
251 	bzero(sqp, sizeof (squeue_t));
252 	sqp->sq_bind = PBIND_NONE;
253 	sqp->sq_priority = pri;
254 	sqp->sq_wait = MSEC_TO_TICK(wait);
255 	sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
256 	    sqp, 0, &p0, TS_RUN, pri);
257 
258 	sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread,
259 	    sqp, 0, &p0, TS_RUN, pri);
260 
261 	sqp->sq_enter = squeue_enter;
262 	sqp->sq_drain = squeue_drain;
263 
264 	return (sqp);
265 }
266 
267 /*
268  * Bind squeue worker thread to the specified CPU, given by CPU id.
269  * If the CPU id  value is -1, bind the worker thread to the value
270  * specified in sq_bind field. If a thread is already bound to a
271  * different CPU, unbind it from the old CPU and bind to the new one.
272  */
273 
274 void
275 squeue_bind(squeue_t *sqp, processorid_t bind)
276 {
277 	mutex_enter(&sqp->sq_lock);
278 	ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE);
279 	ASSERT(MUTEX_HELD(&cpu_lock));
280 
281 	if (sqp->sq_state & SQS_BOUND) {
282 		if (sqp->sq_bind == bind) {
283 			mutex_exit(&sqp->sq_lock);
284 			return;
285 		}
286 		thread_affinity_clear(sqp->sq_worker);
287 	} else {
288 		sqp->sq_state |= SQS_BOUND;
289 	}
290 
291 	if (bind != PBIND_NONE)
292 		sqp->sq_bind = bind;
293 
294 	thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
295 	mutex_exit(&sqp->sq_lock);
296 }
297 
298 void
299 squeue_unbind(squeue_t *sqp)
300 {
301 	mutex_enter(&sqp->sq_lock);
302 	if (!(sqp->sq_state & SQS_BOUND)) {
303 		mutex_exit(&sqp->sq_lock);
304 		return;
305 	}
306 
307 	sqp->sq_state &= ~SQS_BOUND;
308 	thread_affinity_clear(sqp->sq_worker);
309 	mutex_exit(&sqp->sq_lock);
310 }
311 
312 void
313 squeue_worker_wakeup(squeue_t *sqp)
314 {
315 	timeout_id_t tid = (sqp)->sq_tid;
316 
317 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));
318 
319 	if (sqp->sq_wait == 0) {
320 		ASSERT(tid == 0);
321 		ASSERT(!(sqp->sq_state & SQS_TMO_PROG));
322 		sqp->sq_awaken = ddi_get_lbolt();
323 		cv_signal(&sqp->sq_worker_cv);
324 		mutex_exit(&sqp->sq_lock);
325 		return;
326 	}
327 
328 	/*
329 	 * Queue isn't being processed, so take
330 	 * any post enqueue actions needed before leaving.
331 	 */
332 	if (tid != 0) {
333 		/*
334 		 * Waiting for an enter() to process mblk(s).
335 		 */
336 		clock_t now = ddi_get_lbolt();
337 		clock_t	waited = now - sqp->sq_awaken;
338 
339 		if (TICK_TO_MSEC(waited) >= sqp->sq_wait) {
340 			/*
341 			 * Times up and have a worker thread
342 			 * waiting for work, so schedule it.
343 			 */
344 			sqp->sq_tid = 0;
345 			sqp->sq_awaken = now;
346 			cv_signal(&sqp->sq_worker_cv);
347 			mutex_exit(&sqp->sq_lock);
348 			(void) untimeout(tid);
349 			return;
350 		}
351 		mutex_exit(&sqp->sq_lock);
352 		return;
353 	} else if (sqp->sq_state & SQS_TMO_PROG) {
354 		mutex_exit(&sqp->sq_lock);
355 		return;
356 	} else {
357 		clock_t	wait = sqp->sq_wait;
358 		/*
359 		 * Wait up to sqp->sq_wait ms for an
360 		 * enter() to process this queue. We
361 		 * don't want to contend on timeout locks
362 		 * with sq_lock held for performance reasons,
363 		 * so drop the sq_lock before calling timeout
364 		 * but we need to check if timeout is required
365 		 * after re acquiring the sq_lock. Once
366 		 * the sq_lock is dropped, someone else could
367 		 * have processed the packet or the timeout could
368 		 * have already fired.
369 		 */
370 		sqp->sq_state |= SQS_TMO_PROG;
371 		mutex_exit(&sqp->sq_lock);
372 		tid = timeout(squeue_fire, sqp, wait);
373 		mutex_enter(&sqp->sq_lock);
374 		/* Check again if we still need the timeout */
375 		if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) ==
376 		    SQS_TMO_PROG) && (sqp->sq_tid == 0) &&
377 		    (sqp->sq_first != NULL)) {
378 				sqp->sq_state &= ~SQS_TMO_PROG;
379 				sqp->sq_tid = tid;
380 				mutex_exit(&sqp->sq_lock);
381 				return;
382 		} else {
383 			if (sqp->sq_state & SQS_TMO_PROG) {
384 				sqp->sq_state &= ~SQS_TMO_PROG;
385 				mutex_exit(&sqp->sq_lock);
386 				(void) untimeout(tid);
387 			} else {
388 				/*
389 				 * The timer fired before we could
390 				 * reacquire the sq_lock. squeue_fire
391 				 * removes the SQS_TMO_PROG flag
392 				 * and we don't need to	do anything
393 				 * else.
394 				 */
395 				mutex_exit(&sqp->sq_lock);
396 			}
397 		}
398 	}
399 
400 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
401 }
402 
403 /*
404  * squeue_enter() - enter squeue sqp with mblk mp (which can be
405  * a chain), while tail points to the end and cnt in number of
406  * mblks in the chain.
407  *
408  * For a chain of single packet (i.e. mp == tail), go through the
409  * fast path if no one is processing the squeue and nothing is queued.
410  *
411  * The proc and arg for each mblk is already stored in the mblk in
412  * appropriate places.
413  *
414  * The process_flag specifies if we are allowed to process the mblk
415  * and drain in the entering thread context. If process_flag is
416  * SQ_FILL, then we just queue the mblk and return (after signaling
417  * the worker thread if no one else is processing the squeue).
418  *
419  * The ira argument can be used when the count is one.
420  * For a chain the caller needs to prepend any needed mblks from
421  * ip_recv_attr_to_mblk().
422  */
423 /* ARGSUSED */
424 void
425 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
426     ip_recv_attr_t *ira, int process_flag, uint8_t tag)
427 {
428 	conn_t		*connp;
429 	sqproc_t	proc;
430 	hrtime_t	now;
431 
432 	ASSERT(sqp != NULL);
433 	ASSERT(mp != NULL);
434 	ASSERT(tail != NULL);
435 	ASSERT(cnt > 0);
436 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
437 	ASSERT(ira == NULL || cnt == 1);
438 
439 	mutex_enter(&sqp->sq_lock);
440 
441 	/*
442 	 * Try to process the packet if SQ_FILL flag is not set and
443 	 * we are allowed to process the squeue. The SQ_NODRAIN is
444 	 * ignored if the packet chain consists of more than 1 packet.
445 	 */
446 	if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) ||
447 	    (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) {
448 		/*
449 		 * See if anything is already queued. If we are the
450 		 * first packet, do inline processing else queue the
451 		 * packet and do the drain.
452 		 */
453 		if (sqp->sq_first == NULL && cnt == 1) {
454 			/*
455 			 * Fast-path, ok to process and nothing queued.
456 			 */
457 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
458 			sqp->sq_run = curthread;
459 			mutex_exit(&sqp->sq_lock);
460 
461 			/*
462 			 * We are the chain of 1 packet so
463 			 * go through this fast path.
464 			 */
465 			ASSERT(mp->b_prev != NULL);
466 			ASSERT(mp->b_queue != NULL);
467 			connp = (conn_t *)mp->b_prev;
468 			mp->b_prev = NULL;
469 			proc = (sqproc_t)mp->b_queue;
470 			mp->b_queue = NULL;
471 			ASSERT(proc != NULL && connp != NULL);
472 			ASSERT(mp->b_next == NULL);
473 
474 			/*
475 			 * Handle squeue switching. More details in the
476 			 * block comment at the top of the file
477 			 */
478 			if (connp->conn_sqp == sqp) {
479 				SQUEUE_DBG_SET(sqp, mp, proc, connp,
480 				    tag);
481 				connp->conn_on_sqp = B_TRUE;
482 				DTRACE_PROBE3(squeue__proc__start, squeue_t *,
483 				    sqp, mblk_t *, mp, conn_t *, connp);
484 				(*proc)(connp, mp, sqp, ira);
485 				DTRACE_PROBE2(squeue__proc__end, squeue_t *,
486 				    sqp, conn_t *, connp);
487 				connp->conn_on_sqp = B_FALSE;
488 				SQUEUE_DBG_CLEAR(sqp);
489 				CONN_DEC_REF(connp);
490 			} else {
491 				SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
492 				    connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
493 			}
494 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
495 			mutex_enter(&sqp->sq_lock);
496 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
497 			sqp->sq_run = NULL;
498 			if (sqp->sq_first == NULL ||
499 			    process_flag == SQ_NODRAIN) {
500 				if (sqp->sq_first != NULL) {
501 					squeue_worker_wakeup(sqp);
502 					return;
503 				}
504 				/*
505 				 * We processed inline our packet and nothing
506 				 * new has arrived. We are done. In case any
507 				 * control actions are pending, wake up the
508 				 * worker.
509 				 */
510 				if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
511 					cv_signal(&sqp->sq_worker_cv);
512 				mutex_exit(&sqp->sq_lock);
513 				return;
514 			}
515 		} else {
516 			if (ira != NULL) {
517 				mblk_t	*attrmp;
518 
519 				ASSERT(cnt == 1);
520 				attrmp = ip_recv_attr_to_mblk(ira);
521 				if (attrmp == NULL) {
522 					mutex_exit(&sqp->sq_lock);
523 					ip_drop_input("squeue: "
524 					    "ip_recv_attr_to_mblk",
525 					    mp, NULL);
526 					/* Caller already set b_prev/b_next */
527 					mp->b_prev = mp->b_next = NULL;
528 					freemsg(mp);
529 					return;
530 				}
531 				ASSERT(attrmp->b_cont == NULL);
532 				attrmp->b_cont = mp;
533 				/* Move connp and func to new */
534 				attrmp->b_queue = mp->b_queue;
535 				mp->b_queue = NULL;
536 				attrmp->b_prev = mp->b_prev;
537 				mp->b_prev = NULL;
538 
539 				ASSERT(mp == tail);
540 				tail = mp = attrmp;
541 			}
542 
543 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
544 #ifdef DEBUG
545 			mp->b_tag = tag;
546 #endif
547 		}
548 		/*
549 		 * We are here because either we couldn't do inline
550 		 * processing (because something was already queued),
551 		 * or we had a chain of more than one packet,
552 		 * or something else arrived after we were done with
553 		 * inline processing.
554 		 */
555 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
556 		ASSERT(sqp->sq_first != NULL);
557 		now = gethrtime();
558 		sqp->sq_run = curthread;
559 		sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns);
560 
561 		/*
562 		 * If we didn't do a complete drain, the worker
563 		 * thread was already signalled by squeue_drain.
564 		 * In case any control actions are pending, wake
565 		 * up the worker.
566 		 */
567 		sqp->sq_run = NULL;
568 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
569 			cv_signal(&sqp->sq_worker_cv);
570 		mutex_exit(&sqp->sq_lock);
571 		return;
572 	} else {
573 		/*
574 		 * We let a thread processing a squeue reenter only
575 		 * once. This helps the case of incoming connection
576 		 * where a SYN-ACK-ACK that triggers the conn_ind
577 		 * doesn't have to queue the packet if listener and
578 		 * eager are on the same squeue. Also helps the
579 		 * loopback connection where the two ends are bound
580 		 * to the same squeue (which is typical on single
581 		 * CPU machines).
582 		 *
583 		 * We let the thread reenter only once for the fear
584 		 * of stack getting blown with multiple traversal.
585 		 */
586 		connp = (conn_t *)mp->b_prev;
587 		if (!(sqp->sq_state & SQS_REENTER) &&
588 		    (process_flag != SQ_FILL) && (sqp->sq_first == NULL) &&
589 		    (sqp->sq_run == curthread) && (cnt == 1) &&
590 		    (connp->conn_on_sqp == B_FALSE)) {
591 			sqp->sq_state |= SQS_REENTER;
592 			mutex_exit(&sqp->sq_lock);
593 
594 			ASSERT(mp->b_prev != NULL);
595 			ASSERT(mp->b_queue != NULL);
596 
597 			mp->b_prev = NULL;
598 			proc = (sqproc_t)mp->b_queue;
599 			mp->b_queue = NULL;
600 
601 			/*
602 			 * Handle squeue switching. More details in the
603 			 * block comment at the top of the file
604 			 */
605 			if (connp->conn_sqp == sqp) {
606 				connp->conn_on_sqp = B_TRUE;
607 				DTRACE_PROBE3(squeue__proc__start, squeue_t *,
608 				    sqp, mblk_t *, mp, conn_t *, connp);
609 				(*proc)(connp, mp, sqp, ira);
610 				DTRACE_PROBE2(squeue__proc__end, squeue_t *,
611 				    sqp, conn_t *, connp);
612 				connp->conn_on_sqp = B_FALSE;
613 				CONN_DEC_REF(connp);
614 			} else {
615 				SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
616 				    connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
617 			}
618 
619 			mutex_enter(&sqp->sq_lock);
620 			sqp->sq_state &= ~SQS_REENTER;
621 			mutex_exit(&sqp->sq_lock);
622 			return;
623 		}
624 
625 		/*
626 		 * Queue is already being processed or there is already
627 		 * one or more paquets on the queue. Enqueue the
628 		 * packet and wakeup the squeue worker thread if the
629 		 * squeue is not being processed.
630 		 */
631 #ifdef DEBUG
632 		mp->b_tag = tag;
633 #endif
634 		if (ira != NULL) {
635 			mblk_t	*attrmp;
636 
637 			ASSERT(cnt == 1);
638 			attrmp = ip_recv_attr_to_mblk(ira);
639 			if (attrmp == NULL) {
640 				mutex_exit(&sqp->sq_lock);
641 				ip_drop_input("squeue: ip_recv_attr_to_mblk",
642 				    mp, NULL);
643 				/* Caller already set b_prev/b_next */
644 				mp->b_prev = mp->b_next = NULL;
645 				freemsg(mp);
646 				return;
647 			}
648 			ASSERT(attrmp->b_cont == NULL);
649 			attrmp->b_cont = mp;
650 			/* Move connp and func to new */
651 			attrmp->b_queue = mp->b_queue;
652 			mp->b_queue = NULL;
653 			attrmp->b_prev = mp->b_prev;
654 			mp->b_prev = NULL;
655 
656 			ASSERT(mp == tail);
657 			tail = mp = attrmp;
658 		}
659 		ENQUEUE_CHAIN(sqp, mp, tail, cnt);
660 		if (!(sqp->sq_state & SQS_PROC)) {
661 			squeue_worker_wakeup(sqp);
662 			return;
663 		}
664 		/*
665 		 * In case any control actions are pending, wake
666 		 * up the worker.
667 		 */
668 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
669 			cv_signal(&sqp->sq_worker_cv);
670 		mutex_exit(&sqp->sq_lock);
671 		return;
672 	}
673 }
674 
675 /*
676  * PRIVATE FUNCTIONS
677  */
678 
679 static void
680 squeue_fire(void *arg)
681 {
682 	squeue_t	*sqp = arg;
683 	uint_t		state;
684 
685 	mutex_enter(&sqp->sq_lock);
686 
687 	state = sqp->sq_state;
688 	if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) {
689 		mutex_exit(&sqp->sq_lock);
690 		return;
691 	}
692 
693 	sqp->sq_tid = 0;
694 	/*
695 	 * The timeout fired before we got a chance to set it.
696 	 * Process it anyway but remove the SQS_TMO_PROG so that
697 	 * the guy trying to set the timeout knows that it has
698 	 * already been processed.
699 	 */
700 	if (state & SQS_TMO_PROG)
701 		sqp->sq_state &= ~SQS_TMO_PROG;
702 
703 	if (!(state & SQS_PROC)) {
704 		sqp->sq_awaken = ddi_get_lbolt();
705 		cv_signal(&sqp->sq_worker_cv);
706 	}
707 	mutex_exit(&sqp->sq_lock);
708 }
709 
710 static void
711 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
712 {
713 	mblk_t		*mp;
714 	mblk_t 		*head;
715 	sqproc_t 	proc;
716 	conn_t		*connp;
717 	timeout_id_t 	tid;
718 	ill_rx_ring_t	*sq_rx_ring = sqp->sq_rx_ring;
719 	hrtime_t 	now;
720 	boolean_t	did_wakeup = B_FALSE;
721 	boolean_t	sq_poll_capable;
722 	ip_recv_attr_t	*ira, iras;
723 
724 	/*
725 	 * Before doing any work, check our stack depth; if we're not a
726 	 * worker thread for this squeue and we're beginning to get tight on
727 	 * on stack, kick the worker, bump a counter and return.
728 	 */
729 	if (proc_type != SQS_WORKER && STACK_BIAS + (uintptr_t)getfp() -
730 	    (uintptr_t)curthread->t_stkbase < squeue_drain_stack_needed) {
731 		ASSERT(mutex_owned(&sqp->sq_lock));
732 		sqp->sq_awaken = ddi_get_lbolt();
733 		cv_signal(&sqp->sq_worker_cv);
734 		squeue_drain_stack_toodeep++;
735 		return;
736 	}
737 
738 	sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0;
739 again:
740 	ASSERT(mutex_owned(&sqp->sq_lock));
741 	ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
742 	    SQS_POLL_QUIESCE_DONE)));
743 
744 	head = sqp->sq_first;
745 	sqp->sq_first = NULL;
746 	sqp->sq_last = NULL;
747 	sqp->sq_count = 0;
748 
749 	if ((tid = sqp->sq_tid) != 0)
750 		sqp->sq_tid = 0;
751 
752 	sqp->sq_state |= SQS_PROC | proc_type;
753 
754 	/*
755 	 * We have backlog built up. Switch to polling mode if the
756 	 * device underneath allows it. Need to do it so that
757 	 * more packets don't come in and disturb us (by contending
758 	 * for sq_lock or higher priority thread preempting us).
759 	 *
760 	 * The worker thread is allowed to do active polling while we
761 	 * just disable the interrupts for drain by non worker (kernel
762 	 * or userland) threads so they can peacefully process the
763 	 * packets during time allocated to them.
764 	 */
765 	SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring);
766 	mutex_exit(&sqp->sq_lock);
767 
768 	if (tid != 0)
769 		(void) untimeout(tid);
770 
771 	while ((mp = head) != NULL) {
772 
773 		head = mp->b_next;
774 		mp->b_next = NULL;
775 
776 		proc = (sqproc_t)mp->b_queue;
777 		mp->b_queue = NULL;
778 		connp = (conn_t *)mp->b_prev;
779 		mp->b_prev = NULL;
780 
781 		/* Is there an ip_recv_attr_t to handle? */
782 		if (ip_recv_attr_is_mblk(mp)) {
783 			mblk_t	*attrmp = mp;
784 
785 			ASSERT(attrmp->b_cont != NULL);
786 
787 			mp = attrmp->b_cont;
788 			attrmp->b_cont = NULL;
789 			ASSERT(mp->b_queue == NULL);
790 			ASSERT(mp->b_prev == NULL);
791 
792 			if (!ip_recv_attr_from_mblk(attrmp, &iras)) {
793 				/* The ill or ip_stack_t disappeared on us */
794 				ip_drop_input("ip_recv_attr_from_mblk",
795 				    mp, NULL);
796 				ira_cleanup(&iras, B_TRUE);
797 				CONN_DEC_REF(connp);
798 				continue;
799 			}
800 			ira = &iras;
801 		} else {
802 			ira = NULL;
803 		}
804 
805 
806 		/*
807 		 * Handle squeue switching. More details in the
808 		 * block comment at the top of the file
809 		 */
810 		if (connp->conn_sqp == sqp) {
811 			SQUEUE_DBG_SET(sqp, mp, proc, connp,
812 			    mp->b_tag);
813 			connp->conn_on_sqp = B_TRUE;
814 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
815 			    sqp, mblk_t *, mp, conn_t *, connp);
816 			(*proc)(connp, mp, sqp, ira);
817 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
818 			    sqp, conn_t *, connp);
819 			connp->conn_on_sqp = B_FALSE;
820 			CONN_DEC_REF(connp);
821 		} else {
822 			SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira,
823 			    SQ_FILL, SQTAG_SQUEUE_CHANGE);
824 		}
825 		if (ira != NULL)
826 			ira_cleanup(ira, B_TRUE);
827 	}
828 
829 	SQUEUE_DBG_CLEAR(sqp);
830 
831 	mutex_enter(&sqp->sq_lock);
832 
833 	/*
834 	 * Check if there is still work to do (either more arrived or timer
835 	 * expired). If we are the worker thread and we are polling capable,
836 	 * continue doing the work since no one else is around to do the
837 	 * work anyway (but signal the poll thread to retrieve some packets
838 	 * in the meanwhile). If we are not the worker thread, just
839 	 * signal the worker thread to take up the work if processing time
840 	 * has expired.
841 	 */
842 	if (sqp->sq_first != NULL) {
843 		/*
844 		 * Still more to process. If time quanta not expired, we
845 		 * should let the drain go on. The worker thread is allowed
846 		 * to drain as long as there is anything left.
847 		 */
848 		now = gethrtime();
849 		if ((now < expire) || (proc_type == SQS_WORKER)) {
850 			/*
851 			 * If time not expired or we are worker thread and
852 			 * this squeue is polling capable, continue to do
853 			 * the drain.
854 			 *
855 			 * We turn off interrupts for all userland threads
856 			 * doing drain but we do active polling only for
857 			 * worker thread.
858 			 *
859 			 * Calling SQS_POLL_RING() even in the case of
860 			 * SQS_POLLING_ON() not succeeding is ok as
861 			 * SQS_POLL_RING() will not wake up poll thread
862 			 * if SQS_POLLING bit is not set.
863 			 */
864 			if (proc_type == SQS_WORKER)
865 				SQS_POLL_RING(sqp);
866 			goto again;
867 		} else {
868 			did_wakeup = B_TRUE;
869 			sqp->sq_awaken = ddi_get_lbolt();
870 			cv_signal(&sqp->sq_worker_cv);
871 		}
872 	}
873 
874 	/*
875 	 * If the poll thread is already running, just return. The
876 	 * poll thread continues to hold the proc and will finish
877 	 * processing.
878 	 */
879 	if (sqp->sq_state & SQS_GET_PKTS) {
880 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
881 		    SQS_POLL_QUIESCE_DONE)));
882 		sqp->sq_state &= ~proc_type;
883 		return;
884 	}
885 
886 	/*
887 	 *
888 	 * If we are the worker thread and no work is left, send the poll
889 	 * thread down once more to see if something arrived. Otherwise,
890 	 * turn the interrupts back on and we are done.
891 	 */
892 	if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) {
893 		/*
894 		 * Do one last check to see if anything arrived
895 		 * in the NIC. We leave the SQS_PROC set to ensure
896 		 * that poll thread keeps the PROC and can decide
897 		 * if it needs to turn polling off or continue
898 		 * processing.
899 		 *
900 		 * If we drop the SQS_PROC here and poll thread comes
901 		 * up empty handed, it can not safely turn polling off
902 		 * since someone else could have acquired the PROC
903 		 * and started draining. The previously running poll
904 		 * thread and the current thread doing drain would end
905 		 * up in a race for turning polling on/off and more
906 		 * complex code would be required to deal with it.
907 		 *
908 		 * Its lot simpler for drain to hand the SQS_PROC to
909 		 * poll thread (if running) and let poll thread finish
910 		 * without worrying about racing with any other thread.
911 		 */
912 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
913 		    SQS_POLL_QUIESCE_DONE)));
914 		SQS_POLL_RING(sqp);
915 		sqp->sq_state &= ~proc_type;
916 	} else {
917 		/*
918 		 * The squeue is either not capable of polling or the
919 		 * attempt to blank (i.e., turn SQS_POLLING_ON()) was
920 		 * unsuccessful or poll thread already finished
921 		 * processing and didn't find anything. Since there
922 		 * is nothing queued and we already turn polling on
923 		 * (for all threads doing drain), we should turn
924 		 * polling off and relinquish the PROC.
925 		 */
926 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
927 		    SQS_POLL_QUIESCE_DONE)));
928 		SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring);
929 		sqp->sq_state &= ~(SQS_PROC | proc_type);
930 		if (!did_wakeup && sqp->sq_first != NULL) {
931 			squeue_worker_wakeup(sqp);
932 			mutex_enter(&sqp->sq_lock);
933 		}
934 		/*
935 		 * If we are not the worker and there is a pending quiesce
936 		 * event, wake up the worker
937 		 */
938 		if ((proc_type != SQS_WORKER) &&
939 		    (sqp->sq_state & SQS_WORKER_THR_CONTROL))
940 			cv_signal(&sqp->sq_worker_cv);
941 	}
942 }
943 
944 /*
945  * Quiesce, Restart, or Cleanup of the squeue poll thread.
946  *
947  * Quiesce and Restart: After an squeue poll thread has been quiesced, it does
948  * not attempt to poll the underlying soft ring any more. The quiesce is
949  * triggered by the mac layer when it wants to quiesce a soft ring. Typically
950  * control operations such as changing the fanout of a NIC or VNIC (dladm
951  * setlinkprop) need to quiesce data flow before changing the wiring.
952  * The operation is done by the mac layer, but it calls back into IP to
953  * quiesce the soft ring. After completing the operation (say increase or
954  * decrease of the fanout) the mac layer then calls back into IP to restart
955  * the quiesced soft ring.
956  *
957  * Cleanup: This is triggered when the squeue binding to a soft ring is
958  * removed permanently. Typically interface plumb and unplumb would trigger
959  * this. It can also be triggered from the mac layer when a soft ring is
960  * being deleted say as the result of a fanout reduction. Since squeues are
961  * never deleted, the cleanup marks the squeue as fit for recycling and
962  * moves it to the zeroth squeue set.
963  */
964 static void
965 squeue_poll_thr_control(squeue_t *sqp)
966 {
967 	if (sqp->sq_state & SQS_POLL_THR_RESTART) {
968 		/* Restart implies a previous quiesce */
969 		ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED);
970 		sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED |
971 		    SQS_POLL_THR_RESTART);
972 		sqp->sq_state |= SQS_POLL_CAPAB;
973 		cv_signal(&sqp->sq_worker_cv);
974 		return;
975 	}
976 
977 	if (sqp->sq_state & SQS_POLL_THR_QUIESCE) {
978 		sqp->sq_state |= SQS_POLL_THR_QUIESCED;
979 		sqp->sq_state &= ~SQS_POLL_THR_QUIESCE;
980 		cv_signal(&sqp->sq_worker_cv);
981 		return;
982 	}
983 }
984 
985 /*
986  * POLLING Notes
987  *
988  * With polling mode, we want to do as much processing as we possibly can
989  * in worker thread context. The sweet spot is worker thread keeps doing
990  * work all the time in polling mode and writers etc. keep dumping packets
991  * to worker thread. Occassionally, we send the poll thread (running at
992  * lower priority to NIC to get the chain of packets to feed to worker).
993  * Sending the poll thread down to NIC is dependant on 3 criterions
994  *
995  * 1) Its always driven from squeue_drain and only if worker thread is
996  *	doing the drain.
997  * 2) We clear the backlog once and more packets arrived in between.
998  *	Before starting drain again, send the poll thread down if
999  *	the drain is being done by worker thread.
1000  * 3) Before exiting the squeue_drain, if the poll thread is not already
1001  *	working and we are the worker thread, try to poll one more time.
1002  *
1003  * For latency sake, we do allow any thread calling squeue_enter
1004  * to process its packet provided:
1005  *
1006  * 1) Nothing is queued
1007  * 2) If more packets arrived in between, the non worker thread are allowed
1008  *	to do the drain till their time quanta expired provided SQS_GET_PKTS
1009  *	wasn't set in between.
1010  *
1011  * Avoiding deadlocks with interrupts
1012  * ==================================
1013  *
1014  * One of the big problem is that we can't send poll_thr down while holding
1015  * the sq_lock since the thread can block. So we drop the sq_lock before
1016  * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the
1017  * poll thread is running so that no other thread can acquire the
1018  * perimeter in between. If the squeue_drain gets done (no more work
1019  * left), it leaves the SQS_PROC set if poll thread is running.
1020  */
1021 
1022 /*
1023  * This is the squeue poll thread. In poll mode, it polls the underlying
1024  * TCP softring and feeds packets into the squeue. The worker thread then
1025  * drains the squeue. The poll thread also responds to control signals for
1026  * quiesceing, restarting, or cleanup of an squeue. These are driven by
1027  * control operations like plumb/unplumb or as a result of dynamic Rx ring
1028  * related operations that are driven from the mac layer.
1029  */
1030 static void
1031 squeue_polling_thread(squeue_t *sqp)
1032 {
1033 	kmutex_t *lock = &sqp->sq_lock;
1034 	kcondvar_t *async = &sqp->sq_poll_cv;
1035 	ip_mac_rx_t sq_get_pkts;
1036 	ip_accept_t ip_accept;
1037 	ill_rx_ring_t *sq_rx_ring;
1038 	ill_t *sq_ill;
1039 	mblk_t *head, *tail, *mp;
1040 	uint_t cnt;
1041 	void *sq_mac_handle;
1042 	callb_cpr_t cprinfo;
1043 	size_t bytes_to_pickup;
1044 	uint32_t ctl_state;
1045 
1046 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll");
1047 	mutex_enter(lock);
1048 
1049 	for (;;) {
1050 		CALLB_CPR_SAFE_BEGIN(&cprinfo);
1051 		cv_wait(async, lock);
1052 		CALLB_CPR_SAFE_END(&cprinfo, lock);
1053 
1054 		ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL |
1055 		    SQS_POLL_THR_QUIESCED);
1056 		if (ctl_state != 0) {
1057 			/*
1058 			 * If the squeue is quiesced, then wait for a control
1059 			 * request. A quiesced squeue must not poll the
1060 			 * underlying soft ring.
1061 			 */
1062 			if (ctl_state == SQS_POLL_THR_QUIESCED)
1063 				continue;
1064 			/*
1065 			 * Act on control requests to quiesce, cleanup or
1066 			 * restart an squeue
1067 			 */
1068 			squeue_poll_thr_control(sqp);
1069 			continue;
1070 		}
1071 
1072 		if (!(sqp->sq_state & SQS_POLL_CAPAB))
1073 			continue;
1074 
1075 		ASSERT((sqp->sq_state &
1076 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
1077 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
1078 
1079 poll_again:
1080 		sq_rx_ring = sqp->sq_rx_ring;
1081 		sq_get_pkts = sq_rx_ring->rr_rx;
1082 		sq_mac_handle = sq_rx_ring->rr_rx_handle;
1083 		ip_accept = sq_rx_ring->rr_ip_accept;
1084 		sq_ill = sq_rx_ring->rr_ill;
1085 		bytes_to_pickup = MAX_BYTES_TO_PICKUP;
1086 		mutex_exit(lock);
1087 		head = sq_get_pkts(sq_mac_handle, bytes_to_pickup);
1088 		mp = NULL;
1089 		if (head != NULL) {
1090 			/*
1091 			 * We got the packet chain from the mac layer. It
1092 			 * would be nice to be able to process it inline
1093 			 * for better performance but we need to give
1094 			 * IP a chance to look at this chain to ensure
1095 			 * that packets are really meant for this squeue
1096 			 * and do the IP processing.
1097 			 */
1098 			mp = ip_accept(sq_ill, sq_rx_ring, sqp, head,
1099 			    &tail, &cnt);
1100 		}
1101 		mutex_enter(lock);
1102 		if (mp != NULL) {
1103 			/*
1104 			 * The ip_accept function has already added an
1105 			 * ip_recv_attr_t mblk if that is needed.
1106 			 */
1107 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
1108 		}
1109 		ASSERT((sqp->sq_state &
1110 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
1111 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
1112 
1113 		if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) {
1114 			/*
1115 			 * We have packets to process and worker thread
1116 			 * is not running.  Check to see if poll thread is
1117 			 * allowed to process. Let it do processing only if it
1118 			 * picked up some packets from the NIC otherwise
1119 			 * wakeup the worker thread.
1120 			 */
1121 			if (mp != NULL) {
1122 				hrtime_t  now;
1123 
1124 				now = gethrtime();
1125 				sqp->sq_run = curthread;
1126 				sqp->sq_drain(sqp, SQS_POLL_PROC, now +
1127 				    squeue_drain_ns);
1128 				sqp->sq_run = NULL;
1129 
1130 				if (sqp->sq_first == NULL)
1131 					goto poll_again;
1132 
1133 				/*
1134 				 * Couldn't do the entire drain because the
1135 				 * time limit expired, let the
1136 				 * worker thread take over.
1137 				 */
1138 			}
1139 
1140 			sqp->sq_awaken = ddi_get_lbolt();
1141 			/*
1142 			 * Put the SQS_PROC_HELD on so the worker
1143 			 * thread can distinguish where its called from. We
1144 			 * can remove the SQS_PROC flag here and turn off the
1145 			 * polling so that it wouldn't matter who gets the
1146 			 * processing but we get better performance this way
1147 			 * and save the cost of turn polling off and possibly
1148 			 * on again as soon as we start draining again.
1149 			 *
1150 			 * We can't remove the SQS_PROC flag without turning
1151 			 * polling off until we can guarantee that control
1152 			 * will return to squeue_drain immediately.
1153 			 */
1154 			sqp->sq_state |= SQS_PROC_HELD;
1155 			sqp->sq_state &= ~SQS_GET_PKTS;
1156 			cv_signal(&sqp->sq_worker_cv);
1157 		} else if (sqp->sq_first == NULL &&
1158 		    !(sqp->sq_state & SQS_WORKER)) {
1159 			/*
1160 			 * Nothing queued and worker thread not running.
1161 			 * Since we hold the proc, no other thread is
1162 			 * processing the squeue. This means that there
1163 			 * is no work to be done and nothing is queued
1164 			 * in squeue or in NIC. Turn polling off and go
1165 			 * back to interrupt mode.
1166 			 */
1167 			sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS);
1168 			/* LINTED: constant in conditional context */
1169 			SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring);
1170 
1171 			/*
1172 			 * If there is a pending control operation
1173 			 * wake up the worker, since it is currently
1174 			 * not running.
1175 			 */
1176 			if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
1177 				cv_signal(&sqp->sq_worker_cv);
1178 		} else {
1179 			/*
1180 			 * Worker thread is already running. We don't need
1181 			 * to do anything. Indicate that poll thread is done.
1182 			 */
1183 			sqp->sq_state &= ~SQS_GET_PKTS;
1184 		}
1185 		if (sqp->sq_state & SQS_POLL_THR_CONTROL) {
1186 			/*
1187 			 * Act on control requests to quiesce, cleanup or
1188 			 * restart an squeue
1189 			 */
1190 			squeue_poll_thr_control(sqp);
1191 		}
1192 	}
1193 }
1194 
1195 /*
1196  * The squeue worker thread acts on any control requests to quiesce, cleanup
1197  * or restart an ill_rx_ring_t by calling this function. The worker thread
1198  * synchronizes with the squeue poll thread to complete the request and finally
1199  * wakes up the requestor when the request is completed.
1200  */
1201 static void
1202 squeue_worker_thr_control(squeue_t *sqp)
1203 {
1204 	ill_t	*ill;
1205 	ill_rx_ring_t	*rx_ring;
1206 
1207 	ASSERT(MUTEX_HELD(&sqp->sq_lock));
1208 
1209 	if (sqp->sq_state & SQS_POLL_RESTART) {
1210 		/* Restart implies a previous quiesce. */
1211 		ASSERT((sqp->sq_state & (SQS_PROC_HELD |
1212 		    SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) ==
1213 		    (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER));
1214 		/*
1215 		 * Request the squeue poll thread to restart and wait till
1216 		 * it actually restarts.
1217 		 */
1218 		sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE;
1219 		sqp->sq_state |= SQS_POLL_THR_RESTART;
1220 		cv_signal(&sqp->sq_poll_cv);
1221 		while (sqp->sq_state & SQS_POLL_THR_QUIESCED)
1222 			cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1223 		sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC |
1224 		    SQS_WORKER);
1225 		/*
1226 		 * Signal any waiter that is waiting for the restart
1227 		 * to complete
1228 		 */
1229 		sqp->sq_state |= SQS_POLL_RESTART_DONE;
1230 		cv_signal(&sqp->sq_ctrlop_done_cv);
1231 		return;
1232 	}
1233 
1234 	if (sqp->sq_state & SQS_PROC_HELD) {
1235 		/* The squeue poll thread handed control to us */
1236 		ASSERT(sqp->sq_state & SQS_PROC);
1237 	}
1238 
1239 	/*
1240 	 * Prevent any other thread from processing the squeue
1241 	 * until we finish the control actions by setting SQS_PROC.
1242 	 * But allow ourself to reenter by setting SQS_WORKER
1243 	 */
1244 	sqp->sq_state |= (SQS_PROC | SQS_WORKER);
1245 
1246 	/* Signal the squeue poll thread and wait for it to quiesce itself */
1247 	if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) {
1248 		sqp->sq_state |= SQS_POLL_THR_QUIESCE;
1249 		cv_signal(&sqp->sq_poll_cv);
1250 		while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED))
1251 			cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1252 	}
1253 
1254 	rx_ring = sqp->sq_rx_ring;
1255 	ill = rx_ring->rr_ill;
1256 	/*
1257 	 * The lock hierarchy is as follows.
1258 	 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock
1259 	 */
1260 	mutex_exit(&sqp->sq_lock);
1261 	mutex_enter(&ill->ill_lock);
1262 	mutex_enter(&sqp->sq_lock);
1263 
1264 	SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0,
1265 	    sqp->sq_rx_ring);
1266 	sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD);
1267 	if (sqp->sq_state & SQS_POLL_CLEANUP) {
1268 		/*
1269 		 * Disassociate this squeue from its ill_rx_ring_t.
1270 		 * The rr_sqp, sq_rx_ring fields are protected by the
1271 		 * corresponding squeue, ill_lock* and sq_lock. Holding any
1272 		 * of them will ensure that the ring to squeue mapping does
1273 		 * not change.
1274 		 */
1275 		ASSERT(!(sqp->sq_state & SQS_DEFAULT));
1276 
1277 		sqp->sq_rx_ring = NULL;
1278 		rx_ring->rr_sqp = NULL;
1279 
1280 		sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED |
1281 		    SQS_POLL_QUIESCE_DONE);
1282 		sqp->sq_ill = NULL;
1283 
1284 		rx_ring->rr_rx_handle = NULL;
1285 		rx_ring->rr_intr_handle = NULL;
1286 		rx_ring->rr_intr_enable = NULL;
1287 		rx_ring->rr_intr_disable = NULL;
1288 		sqp->sq_state |= SQS_POLL_CLEANUP_DONE;
1289 	} else {
1290 		sqp->sq_state &= ~SQS_POLL_QUIESCE;
1291 		sqp->sq_state |= SQS_POLL_QUIESCE_DONE;
1292 	}
1293 	/*
1294 	 * Signal any waiter that is waiting for the quiesce or cleanup
1295 	 * to complete and also wait for it to actually see and reset the
1296 	 * SQS_POLL_CLEANUP_DONE.
1297 	 */
1298 	cv_signal(&sqp->sq_ctrlop_done_cv);
1299 	mutex_exit(&ill->ill_lock);
1300 	if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) {
1301 		cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1302 		sqp->sq_state &= ~(SQS_PROC | SQS_WORKER);
1303 	}
1304 }
1305 
1306 static void
1307 squeue_worker(squeue_t *sqp)
1308 {
1309 	kmutex_t *lock = &sqp->sq_lock;
1310 	kcondvar_t *async = &sqp->sq_worker_cv;
1311 	callb_cpr_t cprinfo;
1312 	hrtime_t now;
1313 
1314 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker");
1315 	mutex_enter(lock);
1316 
1317 	for (;;) {
1318 		for (;;) {
1319 			/*
1320 			 * If the poll thread has handed control to us
1321 			 * we need to break out of the wait.
1322 			 */
1323 			if (sqp->sq_state & SQS_PROC_HELD)
1324 				break;
1325 
1326 			/*
1327 			 * If the squeue is not being processed and we either
1328 			 * have messages to drain or some thread has signaled
1329 			 * some control activity we need to break
1330 			 */
1331 			if (!(sqp->sq_state & SQS_PROC) &&
1332 			    ((sqp->sq_state & SQS_WORKER_THR_CONTROL) ||
1333 			    (sqp->sq_first != NULL)))
1334 				break;
1335 
1336 			/*
1337 			 * If we have started some control action, then check
1338 			 * for the SQS_WORKER flag (since we don't
1339 			 * release the squeue) to make sure we own the squeue
1340 			 * and break out
1341 			 */
1342 			if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) &&
1343 			    (sqp->sq_state & SQS_WORKER))
1344 				break;
1345 
1346 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
1347 			cv_wait(async, lock);
1348 			CALLB_CPR_SAFE_END(&cprinfo, lock);
1349 		}
1350 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1351 			squeue_worker_thr_control(sqp);
1352 			continue;
1353 		}
1354 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
1355 		    SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE |
1356 		    SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL)));
1357 
1358 		if (sqp->sq_state & SQS_PROC_HELD)
1359 			sqp->sq_state &= ~SQS_PROC_HELD;
1360 
1361 		now = gethrtime();
1362 		sqp->sq_run = curthread;
1363 		sqp->sq_drain(sqp, SQS_WORKER, now +  squeue_drain_ns);
1364 		sqp->sq_run = NULL;
1365 	}
1366 }
1367 
1368 uintptr_t *
1369 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1370 {
1371 	ASSERT(p < SQPRIVATE_MAX);
1372 
1373 	return (&sqp->sq_private[p]);
1374 }
1375 
1376 /* ARGSUSED */
1377 void
1378 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy)
1379 {
1380 	conn_t *connp = (conn_t *)arg;
1381 	squeue_t *sqp = connp->conn_sqp;
1382 
1383 	/*
1384 	 * Mark the squeue as paused before waking up the thread stuck
1385 	 * in squeue_synch_enter().
1386 	 */
1387 	mutex_enter(&sqp->sq_lock);
1388 	sqp->sq_state |= SQS_PAUSE;
1389 
1390 	/*
1391 	 * Notify the thread that it's OK to proceed; that is done by
1392 	 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk.
1393 	 */
1394 	ASSERT(mp->b_flag & MSGWAITSYNC);
1395 	mp->b_flag &= ~MSGWAITSYNC;
1396 	cv_broadcast(&connp->conn_sq_cv);
1397 
1398 	/*
1399 	 * We are doing something on behalf of another thread, so we have to
1400 	 * pause and wait until it finishes.
1401 	 */
1402 	while (sqp->sq_state & SQS_PAUSE) {
1403 		cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock);
1404 	}
1405 	mutex_exit(&sqp->sq_lock);
1406 }
1407 
1408 int
1409 squeue_synch_enter(conn_t *connp, mblk_t *use_mp)
1410 {
1411 	squeue_t *sqp;
1412 
1413 again:
1414 	sqp = connp->conn_sqp;
1415 
1416 	mutex_enter(&sqp->sq_lock);
1417 	if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) {
1418 		/*
1419 		 * We are OK to proceed if the squeue is empty, and
1420 		 * no one owns the squeue.
1421 		 *
1422 		 * The caller won't own the squeue as this is called from the
1423 		 * application.
1424 		 */
1425 		ASSERT(sqp->sq_run == NULL);
1426 
1427 		sqp->sq_state |= SQS_PROC;
1428 		sqp->sq_run = curthread;
1429 		mutex_exit(&sqp->sq_lock);
1430 
1431 		/*
1432 		 * Handle squeue switching. The conn's squeue can only change
1433 		 * while there is a thread in the squeue, which is why we do
1434 		 * the check after entering the squeue. If it has changed, exit
1435 		 * this squeue and redo everything with the new sqeueue.
1436 		 */
1437 		if (sqp != connp->conn_sqp) {
1438 			mutex_enter(&sqp->sq_lock);
1439 			sqp->sq_state &= ~SQS_PROC;
1440 			sqp->sq_run = NULL;
1441 			mutex_exit(&sqp->sq_lock);
1442 			goto again;
1443 		}
1444 #if SQUEUE_DEBUG
1445 		sqp->sq_curmp = NULL;
1446 		sqp->sq_curproc = NULL;
1447 		sqp->sq_connp = connp;
1448 #endif
1449 		connp->conn_on_sqp = B_TRUE;
1450 		return (0);
1451 	} else {
1452 		mblk_t  *mp;
1453 
1454 		mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp;
1455 		if (mp == NULL) {
1456 			mutex_exit(&sqp->sq_lock);
1457 			return (ENOMEM);
1458 		}
1459 
1460 		/*
1461 		 * We mark the mblk as awaiting synchronous squeue access
1462 		 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn
1463 		 * fires, MSGWAITSYNC is cleared, at which point we know we
1464 		 * have exclusive access.
1465 		 */
1466 		mp->b_flag |= MSGWAITSYNC;
1467 
1468 		CONN_INC_REF(connp);
1469 		SET_SQUEUE(mp, squeue_wakeup_conn, connp);
1470 		ENQUEUE_CHAIN(sqp, mp, mp, 1);
1471 
1472 		ASSERT(sqp->sq_run != curthread);
1473 
1474 		/* Wait until the enqueued mblk get processed. */
1475 		while (mp->b_flag & MSGWAITSYNC)
1476 			cv_wait(&connp->conn_sq_cv, &sqp->sq_lock);
1477 		mutex_exit(&sqp->sq_lock);
1478 
1479 		if (use_mp == NULL)
1480 			freeb(mp);
1481 
1482 		return (0);
1483 	}
1484 }
1485 
1486 void
1487 squeue_synch_exit(conn_t *connp)
1488 {
1489 	squeue_t *sqp = connp->conn_sqp;
1490 
1491 	mutex_enter(&sqp->sq_lock);
1492 	if (sqp->sq_run == curthread) {
1493 		ASSERT(sqp->sq_state & SQS_PROC);
1494 
1495 		sqp->sq_state &= ~SQS_PROC;
1496 		sqp->sq_run = NULL;
1497 		connp->conn_on_sqp = B_FALSE;
1498 
1499 		if (sqp->sq_first == NULL) {
1500 			mutex_exit(&sqp->sq_lock);
1501 		} else {
1502 			/*
1503 			 * If this was a normal thread, then it would
1504 			 * (most likely) continue processing the pending
1505 			 * requests. Since the just completed operation
1506 			 * was executed synchronously, the thread should
1507 			 * not be delayed. To compensate, wake up the
1508 			 * worker thread right away when there are outstanding
1509 			 * requests.
1510 			 */
1511 			sqp->sq_awaken = ddi_get_lbolt();
1512 			cv_signal(&sqp->sq_worker_cv);
1513 			mutex_exit(&sqp->sq_lock);
1514 		}
1515 	} else {
1516 		/*
1517 		 * The caller doesn't own the squeue, clear the SQS_PAUSE flag,
1518 		 * and wake up the squeue owner, such that owner can continue
1519 		 * processing.
1520 		 */
1521 		ASSERT(sqp->sq_state & SQS_PAUSE);
1522 		sqp->sq_state &= ~SQS_PAUSE;
1523 
1524 		/* There should be only one thread blocking on sq_synch_cv. */
1525 		cv_signal(&sqp->sq_synch_cv);
1526 		mutex_exit(&sqp->sq_lock);
1527 	}
1528 }
1529