xref: /illumos-gate/usr/src/uts/common/fs/zfs/txg.c (revision 0bb073995ac5a95bd35f2dd790df1ea3d8c2d507)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include <sys/zfs_context.h>
27 #include <sys/txg_impl.h>
28 #include <sys/dmu_impl.h>
29 #include <sys/dsl_pool.h>
30 #include <sys/callb.h>
31 
32 /*
33  * Pool-wide transaction groups.
34  */
35 
36 static void txg_sync_thread(dsl_pool_t *dp);
37 static void txg_quiesce_thread(dsl_pool_t *dp);
38 
39 int zfs_txg_timeout = 30;	/* max seconds worth of delta per txg */
40 
41 /*
42  * Prepare the txg subsystem.
43  */
44 void
45 txg_init(dsl_pool_t *dp, uint64_t txg)
46 {
47 	tx_state_t *tx = &dp->dp_tx;
48 	int c;
49 	bzero(tx, sizeof (tx_state_t));
50 
51 	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
52 
53 	for (c = 0; c < max_ncpus; c++) {
54 		int i;
55 
56 		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
57 		for (i = 0; i < TXG_SIZE; i++) {
58 			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
59 			    NULL);
60 		}
61 	}
62 
63 	rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
64 	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
65 
66 	tx->tx_open_txg = txg;
67 }
68 
69 /*
70  * Close down the txg subsystem.
71  */
72 void
73 txg_fini(dsl_pool_t *dp)
74 {
75 	tx_state_t *tx = &dp->dp_tx;
76 	int c;
77 
78 	ASSERT(tx->tx_threads == 0);
79 
80 	rw_destroy(&tx->tx_suspend);
81 	mutex_destroy(&tx->tx_sync_lock);
82 
83 	for (c = 0; c < max_ncpus; c++) {
84 		int i;
85 
86 		mutex_destroy(&tx->tx_cpu[c].tc_lock);
87 		for (i = 0; i < TXG_SIZE; i++)
88 			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
89 	}
90 
91 	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
92 
93 	bzero(tx, sizeof (tx_state_t));
94 }
95 
96 /*
97  * Start syncing transaction groups.
98  */
99 void
100 txg_sync_start(dsl_pool_t *dp)
101 {
102 	tx_state_t *tx = &dp->dp_tx;
103 
104 	mutex_enter(&tx->tx_sync_lock);
105 
106 	dprintf("pool %p\n", dp);
107 
108 	ASSERT(tx->tx_threads == 0);
109 
110 	tx->tx_threads = 2;
111 
112 	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
113 	    dp, 0, &p0, TS_RUN, minclsyspri);
114 
115 	/*
116 	 * The sync thread can need a larger-than-default stack size on
117 	 * 32-bit x86.  This is due in part to nested pools and
118 	 * scrub_visitbp() recursion.
119 	 */
120 	tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread,
121 	    dp, 0, &p0, TS_RUN, minclsyspri);
122 
123 	mutex_exit(&tx->tx_sync_lock);
124 }
125 
126 static void
127 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
128 {
129 	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
130 	mutex_enter(&tx->tx_sync_lock);
131 }
132 
133 static void
134 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
135 {
136 	ASSERT(*tpp != NULL);
137 	*tpp = NULL;
138 	tx->tx_threads--;
139 	cv_broadcast(&tx->tx_exit_cv);
140 	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
141 	thread_exit();
142 }
143 
144 static void
145 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
146 {
147 	CALLB_CPR_SAFE_BEGIN(cpr);
148 
149 	if (time)
150 		(void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + time);
151 	else
152 		cv_wait(cv, &tx->tx_sync_lock);
153 
154 	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
155 }
156 
157 /*
158  * Stop syncing transaction groups.
159  */
160 void
161 txg_sync_stop(dsl_pool_t *dp)
162 {
163 	tx_state_t *tx = &dp->dp_tx;
164 
165 	dprintf("pool %p\n", dp);
166 	/*
167 	 * Finish off any work in progress.
168 	 */
169 	ASSERT(tx->tx_threads == 2);
170 	txg_wait_synced(dp, 0);
171 
172 	/*
173 	 * Wake all sync threads and wait for them to die.
174 	 */
175 	mutex_enter(&tx->tx_sync_lock);
176 
177 	ASSERT(tx->tx_threads == 2);
178 
179 	tx->tx_exiting = 1;
180 
181 	cv_broadcast(&tx->tx_quiesce_more_cv);
182 	cv_broadcast(&tx->tx_quiesce_done_cv);
183 	cv_broadcast(&tx->tx_sync_more_cv);
184 
185 	while (tx->tx_threads != 0)
186 		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
187 
188 	tx->tx_exiting = 0;
189 
190 	mutex_exit(&tx->tx_sync_lock);
191 }
192 
193 uint64_t
194 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
195 {
196 	tx_state_t *tx = &dp->dp_tx;
197 	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
198 	uint64_t txg;
199 
200 	mutex_enter(&tc->tc_lock);
201 
202 	txg = tx->tx_open_txg;
203 	tc->tc_count[txg & TXG_MASK]++;
204 
205 	th->th_cpu = tc;
206 	th->th_txg = txg;
207 
208 	return (txg);
209 }
210 
211 void
212 txg_rele_to_quiesce(txg_handle_t *th)
213 {
214 	tx_cpu_t *tc = th->th_cpu;
215 
216 	mutex_exit(&tc->tc_lock);
217 }
218 
219 void
220 txg_rele_to_sync(txg_handle_t *th)
221 {
222 	tx_cpu_t *tc = th->th_cpu;
223 	int g = th->th_txg & TXG_MASK;
224 
225 	mutex_enter(&tc->tc_lock);
226 	ASSERT(tc->tc_count[g] != 0);
227 	if (--tc->tc_count[g] == 0)
228 		cv_broadcast(&tc->tc_cv[g]);
229 	mutex_exit(&tc->tc_lock);
230 
231 	th->th_cpu = NULL;	/* defensive */
232 }
233 
234 static void
235 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
236 {
237 	tx_state_t *tx = &dp->dp_tx;
238 	int g = txg & TXG_MASK;
239 	int c;
240 
241 	/*
242 	 * Grab all tx_cpu locks so nobody else can get into this txg.
243 	 */
244 	for (c = 0; c < max_ncpus; c++)
245 		mutex_enter(&tx->tx_cpu[c].tc_lock);
246 
247 	ASSERT(txg == tx->tx_open_txg);
248 	tx->tx_open_txg++;
249 
250 	/*
251 	 * Now that we've incremented tx_open_txg, we can let threads
252 	 * enter the next transaction group.
253 	 */
254 	for (c = 0; c < max_ncpus; c++)
255 		mutex_exit(&tx->tx_cpu[c].tc_lock);
256 
257 	/*
258 	 * Quiesce the transaction group by waiting for everyone to txg_exit().
259 	 */
260 	for (c = 0; c < max_ncpus; c++) {
261 		tx_cpu_t *tc = &tx->tx_cpu[c];
262 		mutex_enter(&tc->tc_lock);
263 		while (tc->tc_count[g] != 0)
264 			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
265 		mutex_exit(&tc->tc_lock);
266 	}
267 }
268 
269 static void
270 txg_sync_thread(dsl_pool_t *dp)
271 {
272 	tx_state_t *tx = &dp->dp_tx;
273 	callb_cpr_t cpr;
274 	uint64_t start, delta;
275 
276 	txg_thread_enter(tx, &cpr);
277 
278 	start = delta = 0;
279 	for (;;) {
280 		uint64_t timer, timeout = zfs_txg_timeout * hz;
281 		uint64_t txg;
282 
283 		/*
284 		 * We sync when there's someone waiting on us, or the
285 		 * quiesce thread has handed off a txg to us, or we have
286 		 * reached our timeout.
287 		 */
288 		timer = (delta >= timeout ? 0 : timeout - delta);
289 		while (!tx->tx_exiting && timer > 0 &&
290 		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
291 		    tx->tx_quiesced_txg == 0) {
292 			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
293 			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
294 			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
295 			delta = lbolt - start;
296 			timer = (delta > timeout ? 0 : timeout - delta);
297 		}
298 
299 		/*
300 		 * Wait until the quiesce thread hands off a txg to us,
301 		 * prompting it to do so if necessary.
302 		 */
303 		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
304 			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
305 				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
306 			cv_broadcast(&tx->tx_quiesce_more_cv);
307 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
308 		}
309 
310 		if (tx->tx_exiting)
311 			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
312 
313 		rw_enter(&tx->tx_suspend, RW_WRITER);
314 
315 		/*
316 		 * Consume the quiesced txg which has been handed off to
317 		 * us.  This may cause the quiescing thread to now be
318 		 * able to quiesce another txg, so we must signal it.
319 		 */
320 		txg = tx->tx_quiesced_txg;
321 		tx->tx_quiesced_txg = 0;
322 		tx->tx_syncing_txg = txg;
323 		cv_broadcast(&tx->tx_quiesce_more_cv);
324 		rw_exit(&tx->tx_suspend);
325 
326 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
327 		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
328 		mutex_exit(&tx->tx_sync_lock);
329 
330 		start = lbolt;
331 		spa_sync(dp->dp_spa, txg);
332 		delta = lbolt - start;
333 
334 		mutex_enter(&tx->tx_sync_lock);
335 		rw_enter(&tx->tx_suspend, RW_WRITER);
336 		tx->tx_synced_txg = txg;
337 		tx->tx_syncing_txg = 0;
338 		rw_exit(&tx->tx_suspend);
339 		cv_broadcast(&tx->tx_sync_done_cv);
340 	}
341 }
342 
343 static void
344 txg_quiesce_thread(dsl_pool_t *dp)
345 {
346 	tx_state_t *tx = &dp->dp_tx;
347 	callb_cpr_t cpr;
348 
349 	txg_thread_enter(tx, &cpr);
350 
351 	for (;;) {
352 		uint64_t txg;
353 
354 		/*
355 		 * We quiesce when there's someone waiting on us.
356 		 * However, we can only have one txg in "quiescing" or
357 		 * "quiesced, waiting to sync" state.  So we wait until
358 		 * the "quiesced, waiting to sync" txg has been consumed
359 		 * by the sync thread.
360 		 */
361 		while (!tx->tx_exiting &&
362 		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
363 		    tx->tx_quiesced_txg != 0))
364 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
365 
366 		if (tx->tx_exiting)
367 			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
368 
369 		txg = tx->tx_open_txg;
370 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
371 		    txg, tx->tx_quiesce_txg_waiting,
372 		    tx->tx_sync_txg_waiting);
373 		mutex_exit(&tx->tx_sync_lock);
374 		txg_quiesce(dp, txg);
375 		mutex_enter(&tx->tx_sync_lock);
376 
377 		/*
378 		 * Hand this txg off to the sync thread.
379 		 */
380 		dprintf("quiesce done, handing off txg %llu\n", txg);
381 		tx->tx_quiesced_txg = txg;
382 		cv_broadcast(&tx->tx_sync_more_cv);
383 		cv_broadcast(&tx->tx_quiesce_done_cv);
384 	}
385 }
386 
387 /*
388  * Delay this thread by 'ticks' if we are still in the open transaction
389  * group and there is already a waiting txg quiesing or quiesced.  Abort
390  * the delay if this txg stalls or enters the quiesing state.
391  */
392 void
393 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
394 {
395 	tx_state_t *tx = &dp->dp_tx;
396 	int timeout = lbolt + ticks;
397 
398 	/* don't delay if this txg could transition to quiesing immediately */
399 	if (tx->tx_open_txg > txg ||
400 	    tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
401 		return;
402 
403 	mutex_enter(&tx->tx_sync_lock);
404 	if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
405 		mutex_exit(&tx->tx_sync_lock);
406 		return;
407 	}
408 
409 	while (lbolt < timeout &&
410 	    tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
411 		(void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
412 		    timeout);
413 
414 	mutex_exit(&tx->tx_sync_lock);
415 }
416 
417 void
418 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
419 {
420 	tx_state_t *tx = &dp->dp_tx;
421 
422 	mutex_enter(&tx->tx_sync_lock);
423 	ASSERT(tx->tx_threads == 2);
424 	if (txg == 0)
425 		txg = tx->tx_open_txg;
426 	if (tx->tx_sync_txg_waiting < txg)
427 		tx->tx_sync_txg_waiting = txg;
428 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
429 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
430 	while (tx->tx_synced_txg < txg) {
431 		dprintf("broadcasting sync more "
432 		    "tx_synced=%llu waiting=%llu dp=%p\n",
433 		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
434 		cv_broadcast(&tx->tx_sync_more_cv);
435 		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
436 	}
437 	mutex_exit(&tx->tx_sync_lock);
438 }
439 
440 void
441 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
442 {
443 	tx_state_t *tx = &dp->dp_tx;
444 
445 	mutex_enter(&tx->tx_sync_lock);
446 	ASSERT(tx->tx_threads == 2);
447 	if (txg == 0)
448 		txg = tx->tx_open_txg + 1;
449 	if (tx->tx_quiesce_txg_waiting < txg)
450 		tx->tx_quiesce_txg_waiting = txg;
451 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
452 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
453 	while (tx->tx_open_txg < txg) {
454 		cv_broadcast(&tx->tx_quiesce_more_cv);
455 		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
456 	}
457 	mutex_exit(&tx->tx_sync_lock);
458 }
459 
460 boolean_t
461 txg_stalled(dsl_pool_t *dp)
462 {
463 	tx_state_t *tx = &dp->dp_tx;
464 	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
465 }
466 
467 boolean_t
468 txg_sync_waiting(dsl_pool_t *dp)
469 {
470 	tx_state_t *tx = &dp->dp_tx;
471 
472 	return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
473 	    tx->tx_quiesced_txg != 0);
474 }
475 
476 void
477 txg_suspend(dsl_pool_t *dp)
478 {
479 	tx_state_t *tx = &dp->dp_tx;
480 	/* XXX some code paths suspend when they are already suspended! */
481 	rw_enter(&tx->tx_suspend, RW_READER);
482 }
483 
484 void
485 txg_resume(dsl_pool_t *dp)
486 {
487 	tx_state_t *tx = &dp->dp_tx;
488 	rw_exit(&tx->tx_suspend);
489 }
490 
491 /*
492  * Per-txg object lists.
493  */
494 void
495 txg_list_create(txg_list_t *tl, size_t offset)
496 {
497 	int t;
498 
499 	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
500 
501 	tl->tl_offset = offset;
502 
503 	for (t = 0; t < TXG_SIZE; t++)
504 		tl->tl_head[t] = NULL;
505 }
506 
507 void
508 txg_list_destroy(txg_list_t *tl)
509 {
510 	int t;
511 
512 	for (t = 0; t < TXG_SIZE; t++)
513 		ASSERT(txg_list_empty(tl, t));
514 
515 	mutex_destroy(&tl->tl_lock);
516 }
517 
518 int
519 txg_list_empty(txg_list_t *tl, uint64_t txg)
520 {
521 	return (tl->tl_head[txg & TXG_MASK] == NULL);
522 }
523 
524 /*
525  * Add an entry to the list.
526  * Returns 0 if it's a new entry, 1 if it's already there.
527  */
528 int
529 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
530 {
531 	int t = txg & TXG_MASK;
532 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
533 	int already_on_list;
534 
535 	mutex_enter(&tl->tl_lock);
536 	already_on_list = tn->tn_member[t];
537 	if (!already_on_list) {
538 		tn->tn_member[t] = 1;
539 		tn->tn_next[t] = tl->tl_head[t];
540 		tl->tl_head[t] = tn;
541 	}
542 	mutex_exit(&tl->tl_lock);
543 
544 	return (already_on_list);
545 }
546 
547 /*
548  * Remove the head of the list and return it.
549  */
550 void *
551 txg_list_remove(txg_list_t *tl, uint64_t txg)
552 {
553 	int t = txg & TXG_MASK;
554 	txg_node_t *tn;
555 	void *p = NULL;
556 
557 	mutex_enter(&tl->tl_lock);
558 	if ((tn = tl->tl_head[t]) != NULL) {
559 		p = (char *)tn - tl->tl_offset;
560 		tl->tl_head[t] = tn->tn_next[t];
561 		tn->tn_next[t] = NULL;
562 		tn->tn_member[t] = 0;
563 	}
564 	mutex_exit(&tl->tl_lock);
565 
566 	return (p);
567 }
568 
569 /*
570  * Remove a specific item from the list and return it.
571  */
572 void *
573 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
574 {
575 	int t = txg & TXG_MASK;
576 	txg_node_t *tn, **tp;
577 
578 	mutex_enter(&tl->tl_lock);
579 
580 	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
581 		if ((char *)tn - tl->tl_offset == p) {
582 			*tp = tn->tn_next[t];
583 			tn->tn_next[t] = NULL;
584 			tn->tn_member[t] = 0;
585 			mutex_exit(&tl->tl_lock);
586 			return (p);
587 		}
588 	}
589 
590 	mutex_exit(&tl->tl_lock);
591 
592 	return (NULL);
593 }
594 
595 int
596 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
597 {
598 	int t = txg & TXG_MASK;
599 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
600 
601 	return (tn->tn_member[t]);
602 }
603 
604 /*
605  * Walk a txg list -- only safe if you know it's not changing.
606  */
607 void *
608 txg_list_head(txg_list_t *tl, uint64_t txg)
609 {
610 	int t = txg & TXG_MASK;
611 	txg_node_t *tn = tl->tl_head[t];
612 
613 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
614 }
615 
616 void *
617 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
618 {
619 	int t = txg & TXG_MASK;
620 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
621 
622 	tn = tn->tn_next[t];
623 
624 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
625 }
626