xref: /linux/drivers/block/drbd/drbd_main.c (revision cdb138080b78146d1cdadba9f5dadbeb97445b91)
1 /*
2    drbd.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12 
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17 
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22 
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 
27  */
28 
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/smp_lock.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48 
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52 
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56 
57 #include "drbd_vli.h"
58 
59 struct after_state_chg_work {
60 	struct drbd_work w;
61 	union drbd_state os;
62 	union drbd_state ns;
63 	enum chg_state_flags flags;
64 	struct completion *done;
65 };
66 
67 int drbdd_init(struct drbd_thread *);
68 int drbd_worker(struct drbd_thread *);
69 int drbd_asender(struct drbd_thread *);
70 
71 int drbd_init(void);
72 static int drbd_open(struct block_device *bdev, fmode_t mode);
73 static int drbd_release(struct gendisk *gd, fmode_t mode);
74 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
75 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
76 			   union drbd_state ns, enum chg_state_flags flags);
77 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
78 static void md_sync_timer_fn(unsigned long data);
79 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
80 
81 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
82 	      "Lars Ellenberg <lars@linbit.com>");
83 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
84 MODULE_VERSION(REL_VERSION);
85 MODULE_LICENSE("GPL");
86 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
87 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
88 
89 #include <linux/moduleparam.h>
90 /* allow_open_on_secondary */
91 MODULE_PARM_DESC(allow_oos, "DONT USE!");
92 /* thanks to these macros, if compiled into the kernel (not-module),
93  * this becomes the boot parameter drbd.minor_count */
94 module_param(minor_count, uint, 0444);
95 module_param(disable_sendpage, bool, 0644);
96 module_param(allow_oos, bool, 0);
97 module_param(cn_idx, uint, 0444);
98 module_param(proc_details, int, 0644);
99 
100 #ifdef CONFIG_DRBD_FAULT_INJECTION
101 int enable_faults;
102 int fault_rate;
103 static int fault_count;
104 int fault_devs;
105 /* bitmap of enabled faults */
106 module_param(enable_faults, int, 0664);
107 /* fault rate % value - applies to all enabled faults */
108 module_param(fault_rate, int, 0664);
109 /* count of faults inserted */
110 module_param(fault_count, int, 0664);
111 /* bitmap of devices to insert faults on */
112 module_param(fault_devs, int, 0644);
113 #endif
114 
115 /* module parameter, defined */
116 unsigned int minor_count = 32;
117 int disable_sendpage;
118 int allow_oos;
119 unsigned int cn_idx = CN_IDX_DRBD;
120 int proc_details;       /* Detail level in proc drbd*/
121 
122 /* Module parameter for setting the user mode helper program
123  * to run. Default is /sbin/drbdadm */
124 char usermode_helper[80] = "/sbin/drbdadm";
125 
126 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
127 
128 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
129  * as member "struct gendisk *vdisk;"
130  */
131 struct drbd_conf **minor_table;
132 
133 struct kmem_cache *drbd_request_cache;
134 struct kmem_cache *drbd_ee_cache;	/* epoch entries */
135 struct kmem_cache *drbd_bm_ext_cache;	/* bitmap extents */
136 struct kmem_cache *drbd_al_ext_cache;	/* activity log extents */
137 mempool_t *drbd_request_mempool;
138 mempool_t *drbd_ee_mempool;
139 
140 /* I do not use a standard mempool, because:
141    1) I want to hand out the pre-allocated objects first.
142    2) I want to be able to interrupt sleeping allocation with a signal.
143    Note: This is a single linked list, the next pointer is the private
144 	 member of struct page.
145  */
146 struct page *drbd_pp_pool;
147 spinlock_t   drbd_pp_lock;
148 int          drbd_pp_vacant;
149 wait_queue_head_t drbd_pp_wait;
150 
151 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
152 
153 static const struct block_device_operations drbd_ops = {
154 	.owner =   THIS_MODULE,
155 	.open =    drbd_open,
156 	.release = drbd_release,
157 };
158 
159 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
160 
161 #ifdef __CHECKER__
162 /* When checking with sparse, and this is an inline function, sparse will
163    give tons of false positives. When this is a real functions sparse works.
164  */
165 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
166 {
167 	int io_allowed;
168 
169 	atomic_inc(&mdev->local_cnt);
170 	io_allowed = (mdev->state.disk >= mins);
171 	if (!io_allowed) {
172 		if (atomic_dec_and_test(&mdev->local_cnt))
173 			wake_up(&mdev->misc_wait);
174 	}
175 	return io_allowed;
176 }
177 
178 #endif
179 
180 /**
181  * DOC: The transfer log
182  *
183  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
184  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
185  * of the list. There is always at least one &struct drbd_tl_epoch object.
186  *
187  * Each &struct drbd_tl_epoch has a circular double linked list of requests
188  * attached.
189  */
190 static int tl_init(struct drbd_conf *mdev)
191 {
192 	struct drbd_tl_epoch *b;
193 
194 	/* during device minor initialization, we may well use GFP_KERNEL */
195 	b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
196 	if (!b)
197 		return 0;
198 	INIT_LIST_HEAD(&b->requests);
199 	INIT_LIST_HEAD(&b->w.list);
200 	b->next = NULL;
201 	b->br_number = 4711;
202 	b->n_req = 0;
203 	b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
204 
205 	mdev->oldest_tle = b;
206 	mdev->newest_tle = b;
207 	INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
208 
209 	mdev->tl_hash = NULL;
210 	mdev->tl_hash_s = 0;
211 
212 	return 1;
213 }
214 
215 static void tl_cleanup(struct drbd_conf *mdev)
216 {
217 	D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
218 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
219 	kfree(mdev->oldest_tle);
220 	mdev->oldest_tle = NULL;
221 	kfree(mdev->unused_spare_tle);
222 	mdev->unused_spare_tle = NULL;
223 	kfree(mdev->tl_hash);
224 	mdev->tl_hash = NULL;
225 	mdev->tl_hash_s = 0;
226 }
227 
228 /**
229  * _tl_add_barrier() - Adds a barrier to the transfer log
230  * @mdev:	DRBD device.
231  * @new:	Barrier to be added before the current head of the TL.
232  *
233  * The caller must hold the req_lock.
234  */
235 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
236 {
237 	struct drbd_tl_epoch *newest_before;
238 
239 	INIT_LIST_HEAD(&new->requests);
240 	INIT_LIST_HEAD(&new->w.list);
241 	new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
242 	new->next = NULL;
243 	new->n_req = 0;
244 
245 	newest_before = mdev->newest_tle;
246 	/* never send a barrier number == 0, because that is special-cased
247 	 * when using TCQ for our write ordering code */
248 	new->br_number = (newest_before->br_number+1) ?: 1;
249 	if (mdev->newest_tle != new) {
250 		mdev->newest_tle->next = new;
251 		mdev->newest_tle = new;
252 	}
253 }
254 
255 /**
256  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
257  * @mdev:	DRBD device.
258  * @barrier_nr:	Expected identifier of the DRBD write barrier packet.
259  * @set_size:	Expected number of requests before that barrier.
260  *
261  * In case the passed barrier_nr or set_size does not match the oldest
262  * &struct drbd_tl_epoch objects this function will cause a termination
263  * of the connection.
264  */
265 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
266 		       unsigned int set_size)
267 {
268 	struct drbd_tl_epoch *b, *nob; /* next old barrier */
269 	struct list_head *le, *tle;
270 	struct drbd_request *r;
271 
272 	spin_lock_irq(&mdev->req_lock);
273 
274 	b = mdev->oldest_tle;
275 
276 	/* first some paranoia code */
277 	if (b == NULL) {
278 		dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
279 			barrier_nr);
280 		goto bail;
281 	}
282 	if (b->br_number != barrier_nr) {
283 		dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
284 			barrier_nr, b->br_number);
285 		goto bail;
286 	}
287 	if (b->n_req != set_size) {
288 		dev_err(DEV, "BAD! BarrierAck #%u received with n_req=%u, expected n_req=%u!\n",
289 			barrier_nr, set_size, b->n_req);
290 		goto bail;
291 	}
292 
293 	/* Clean up list of requests processed during current epoch */
294 	list_for_each_safe(le, tle, &b->requests) {
295 		r = list_entry(le, struct drbd_request, tl_requests);
296 		_req_mod(r, barrier_acked);
297 	}
298 	/* There could be requests on the list waiting for completion
299 	   of the write to the local disk. To avoid corruptions of
300 	   slab's data structures we have to remove the lists head.
301 
302 	   Also there could have been a barrier ack out of sequence, overtaking
303 	   the write acks - which would be a bug and violating write ordering.
304 	   To not deadlock in case we lose connection while such requests are
305 	   still pending, we need some way to find them for the
306 	   _req_mode(connection_lost_while_pending).
307 
308 	   These have been list_move'd to the out_of_sequence_requests list in
309 	   _req_mod(, barrier_acked) above.
310 	   */
311 	list_del_init(&b->requests);
312 
313 	nob = b->next;
314 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
315 		_tl_add_barrier(mdev, b);
316 		if (nob)
317 			mdev->oldest_tle = nob;
318 		/* if nob == NULL b was the only barrier, and becomes the new
319 		   barrier. Therefore mdev->oldest_tle points already to b */
320 	} else {
321 		D_ASSERT(nob != NULL);
322 		mdev->oldest_tle = nob;
323 		kfree(b);
324 	}
325 
326 	spin_unlock_irq(&mdev->req_lock);
327 	dec_ap_pending(mdev);
328 
329 	return;
330 
331 bail:
332 	spin_unlock_irq(&mdev->req_lock);
333 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
334 }
335 
336 
337 /**
338  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
339  * @mdev:	DRBD device.
340  *
341  * This is called after the connection to the peer was lost. The storage covered
342  * by the requests on the transfer gets marked as our of sync. Called from the
343  * receiver thread and the worker thread.
344  */
345 void tl_clear(struct drbd_conf *mdev)
346 {
347 	struct drbd_tl_epoch *b, *tmp;
348 	struct list_head *le, *tle;
349 	struct drbd_request *r;
350 	int new_initial_bnr = net_random();
351 
352 	spin_lock_irq(&mdev->req_lock);
353 
354 	b = mdev->oldest_tle;
355 	while (b) {
356 		list_for_each_safe(le, tle, &b->requests) {
357 			r = list_entry(le, struct drbd_request, tl_requests);
358 			/* It would be nice to complete outside of spinlock.
359 			 * But this is easier for now. */
360 			_req_mod(r, connection_lost_while_pending);
361 		}
362 		tmp = b->next;
363 
364 		/* there could still be requests on that ring list,
365 		 * in case local io is still pending */
366 		list_del(&b->requests);
367 
368 		/* dec_ap_pending corresponding to queue_barrier.
369 		 * the newest barrier may not have been queued yet,
370 		 * in which case w.cb is still NULL. */
371 		if (b->w.cb != NULL)
372 			dec_ap_pending(mdev);
373 
374 		if (b == mdev->newest_tle) {
375 			/* recycle, but reinit! */
376 			D_ASSERT(tmp == NULL);
377 			INIT_LIST_HEAD(&b->requests);
378 			INIT_LIST_HEAD(&b->w.list);
379 			b->w.cb = NULL;
380 			b->br_number = new_initial_bnr;
381 			b->n_req = 0;
382 
383 			mdev->oldest_tle = b;
384 			break;
385 		}
386 		kfree(b);
387 		b = tmp;
388 	}
389 
390 	/* we expect this list to be empty. */
391 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
392 
393 	/* but just in case, clean it up anyways! */
394 	list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
395 		r = list_entry(le, struct drbd_request, tl_requests);
396 		/* It would be nice to complete outside of spinlock.
397 		 * But this is easier for now. */
398 		_req_mod(r, connection_lost_while_pending);
399 	}
400 
401 	/* ensure bit indicating barrier is required is clear */
402 	clear_bit(CREATE_BARRIER, &mdev->flags);
403 
404 	spin_unlock_irq(&mdev->req_lock);
405 }
406 
407 /**
408  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
409  * @mdev:	DRBD device.
410  * @os:		old (current) state.
411  * @ns:		new (wanted) state.
412  */
413 static int cl_wide_st_chg(struct drbd_conf *mdev,
414 			  union drbd_state os, union drbd_state ns)
415 {
416 	return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
417 		 ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
418 		  (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
419 		  (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
420 		  (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
421 		(os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
422 		(os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
423 }
424 
425 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
426 		      union drbd_state mask, union drbd_state val)
427 {
428 	unsigned long flags;
429 	union drbd_state os, ns;
430 	int rv;
431 
432 	spin_lock_irqsave(&mdev->req_lock, flags);
433 	os = mdev->state;
434 	ns.i = (os.i & ~mask.i) | val.i;
435 	rv = _drbd_set_state(mdev, ns, f, NULL);
436 	ns = mdev->state;
437 	spin_unlock_irqrestore(&mdev->req_lock, flags);
438 
439 	return rv;
440 }
441 
442 /**
443  * drbd_force_state() - Impose a change which happens outside our control on our state
444  * @mdev:	DRBD device.
445  * @mask:	mask of state bits to change.
446  * @val:	value of new state bits.
447  */
448 void drbd_force_state(struct drbd_conf *mdev,
449 	union drbd_state mask, union drbd_state val)
450 {
451 	drbd_change_state(mdev, CS_HARD, mask, val);
452 }
453 
454 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
455 static int is_valid_state_transition(struct drbd_conf *,
456 				     union drbd_state, union drbd_state);
457 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
458 				       union drbd_state ns, int *warn_sync_abort);
459 int drbd_send_state_req(struct drbd_conf *,
460 			union drbd_state, union drbd_state);
461 
462 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
463 				    union drbd_state mask, union drbd_state val)
464 {
465 	union drbd_state os, ns;
466 	unsigned long flags;
467 	int rv;
468 
469 	if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
470 		return SS_CW_SUCCESS;
471 
472 	if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
473 		return SS_CW_FAILED_BY_PEER;
474 
475 	rv = 0;
476 	spin_lock_irqsave(&mdev->req_lock, flags);
477 	os = mdev->state;
478 	ns.i = (os.i & ~mask.i) | val.i;
479 	ns = sanitize_state(mdev, os, ns, NULL);
480 
481 	if (!cl_wide_st_chg(mdev, os, ns))
482 		rv = SS_CW_NO_NEED;
483 	if (!rv) {
484 		rv = is_valid_state(mdev, ns);
485 		if (rv == SS_SUCCESS) {
486 			rv = is_valid_state_transition(mdev, ns, os);
487 			if (rv == SS_SUCCESS)
488 				rv = 0; /* cont waiting, otherwise fail. */
489 		}
490 	}
491 	spin_unlock_irqrestore(&mdev->req_lock, flags);
492 
493 	return rv;
494 }
495 
496 /**
497  * drbd_req_state() - Perform an eventually cluster wide state change
498  * @mdev:	DRBD device.
499  * @mask:	mask of state bits to change.
500  * @val:	value of new state bits.
501  * @f:		flags
502  *
503  * Should not be called directly, use drbd_request_state() or
504  * _drbd_request_state().
505  */
506 static int drbd_req_state(struct drbd_conf *mdev,
507 			  union drbd_state mask, union drbd_state val,
508 			  enum chg_state_flags f)
509 {
510 	struct completion done;
511 	unsigned long flags;
512 	union drbd_state os, ns;
513 	int rv;
514 
515 	init_completion(&done);
516 
517 	if (f & CS_SERIALIZE)
518 		mutex_lock(&mdev->state_mutex);
519 
520 	spin_lock_irqsave(&mdev->req_lock, flags);
521 	os = mdev->state;
522 	ns.i = (os.i & ~mask.i) | val.i;
523 	ns = sanitize_state(mdev, os, ns, NULL);
524 
525 	if (cl_wide_st_chg(mdev, os, ns)) {
526 		rv = is_valid_state(mdev, ns);
527 		if (rv == SS_SUCCESS)
528 			rv = is_valid_state_transition(mdev, ns, os);
529 		spin_unlock_irqrestore(&mdev->req_lock, flags);
530 
531 		if (rv < SS_SUCCESS) {
532 			if (f & CS_VERBOSE)
533 				print_st_err(mdev, os, ns, rv);
534 			goto abort;
535 		}
536 
537 		drbd_state_lock(mdev);
538 		if (!drbd_send_state_req(mdev, mask, val)) {
539 			drbd_state_unlock(mdev);
540 			rv = SS_CW_FAILED_BY_PEER;
541 			if (f & CS_VERBOSE)
542 				print_st_err(mdev, os, ns, rv);
543 			goto abort;
544 		}
545 
546 		wait_event(mdev->state_wait,
547 			(rv = _req_st_cond(mdev, mask, val)));
548 
549 		if (rv < SS_SUCCESS) {
550 			drbd_state_unlock(mdev);
551 			if (f & CS_VERBOSE)
552 				print_st_err(mdev, os, ns, rv);
553 			goto abort;
554 		}
555 		spin_lock_irqsave(&mdev->req_lock, flags);
556 		os = mdev->state;
557 		ns.i = (os.i & ~mask.i) | val.i;
558 		rv = _drbd_set_state(mdev, ns, f, &done);
559 		drbd_state_unlock(mdev);
560 	} else {
561 		rv = _drbd_set_state(mdev, ns, f, &done);
562 	}
563 
564 	spin_unlock_irqrestore(&mdev->req_lock, flags);
565 
566 	if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
567 		D_ASSERT(current != mdev->worker.task);
568 		wait_for_completion(&done);
569 	}
570 
571 abort:
572 	if (f & CS_SERIALIZE)
573 		mutex_unlock(&mdev->state_mutex);
574 
575 	return rv;
576 }
577 
578 /**
579  * _drbd_request_state() - Request a state change (with flags)
580  * @mdev:	DRBD device.
581  * @mask:	mask of state bits to change.
582  * @val:	value of new state bits.
583  * @f:		flags
584  *
585  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
586  * flag, or when logging of failed state change requests is not desired.
587  */
588 int _drbd_request_state(struct drbd_conf *mdev,	union drbd_state mask,
589 			union drbd_state val,	enum chg_state_flags f)
590 {
591 	int rv;
592 
593 	wait_event(mdev->state_wait,
594 		   (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
595 
596 	return rv;
597 }
598 
599 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
600 {
601 	dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
602 	    name,
603 	    drbd_conn_str(ns.conn),
604 	    drbd_role_str(ns.role),
605 	    drbd_role_str(ns.peer),
606 	    drbd_disk_str(ns.disk),
607 	    drbd_disk_str(ns.pdsk),
608 	    ns.susp ? 's' : 'r',
609 	    ns.aftr_isp ? 'a' : '-',
610 	    ns.peer_isp ? 'p' : '-',
611 	    ns.user_isp ? 'u' : '-'
612 	    );
613 }
614 
615 void print_st_err(struct drbd_conf *mdev,
616 	union drbd_state os, union drbd_state ns, int err)
617 {
618 	if (err == SS_IN_TRANSIENT_STATE)
619 		return;
620 	dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
621 	print_st(mdev, " state", os);
622 	print_st(mdev, "wanted", ns);
623 }
624 
625 
626 #define drbd_peer_str drbd_role_str
627 #define drbd_pdsk_str drbd_disk_str
628 
629 #define drbd_susp_str(A)     ((A) ? "1" : "0")
630 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
631 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
632 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
633 
634 #define PSC(A) \
635 	({ if (ns.A != os.A) { \
636 		pbp += sprintf(pbp, #A "( %s -> %s ) ", \
637 			      drbd_##A##_str(os.A), \
638 			      drbd_##A##_str(ns.A)); \
639 	} })
640 
641 /**
642  * is_valid_state() - Returns an SS_ error code if ns is not valid
643  * @mdev:	DRBD device.
644  * @ns:		State to consider.
645  */
646 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
647 {
648 	/* See drbd_state_sw_errors in drbd_strings.c */
649 
650 	enum drbd_fencing_p fp;
651 	int rv = SS_SUCCESS;
652 
653 	fp = FP_DONT_CARE;
654 	if (get_ldev(mdev)) {
655 		fp = mdev->ldev->dc.fencing;
656 		put_ldev(mdev);
657 	}
658 
659 	if (get_net_conf(mdev)) {
660 		if (!mdev->net_conf->two_primaries &&
661 		    ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
662 			rv = SS_TWO_PRIMARIES;
663 		put_net_conf(mdev);
664 	}
665 
666 	if (rv <= 0)
667 		/* already found a reason to abort */;
668 	else if (ns.role == R_SECONDARY && mdev->open_cnt)
669 		rv = SS_DEVICE_IN_USE;
670 
671 	else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
672 		rv = SS_NO_UP_TO_DATE_DISK;
673 
674 	else if (fp >= FP_RESOURCE &&
675 		 ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
676 		rv = SS_PRIMARY_NOP;
677 
678 	else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
679 		rv = SS_NO_UP_TO_DATE_DISK;
680 
681 	else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
682 		rv = SS_NO_LOCAL_DISK;
683 
684 	else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
685 		rv = SS_NO_REMOTE_DISK;
686 
687 	else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
688 		rv = SS_NO_UP_TO_DATE_DISK;
689 
690 	else if ((ns.conn == C_CONNECTED ||
691 		  ns.conn == C_WF_BITMAP_S ||
692 		  ns.conn == C_SYNC_SOURCE ||
693 		  ns.conn == C_PAUSED_SYNC_S) &&
694 		  ns.disk == D_OUTDATED)
695 		rv = SS_CONNECTED_OUTDATES;
696 
697 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
698 		 (mdev->sync_conf.verify_alg[0] == 0))
699 		rv = SS_NO_VERIFY_ALG;
700 
701 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
702 		  mdev->agreed_pro_version < 88)
703 		rv = SS_NOT_SUPPORTED;
704 
705 	return rv;
706 }
707 
708 /**
709  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
710  * @mdev:	DRBD device.
711  * @ns:		new state.
712  * @os:		old state.
713  */
714 static int is_valid_state_transition(struct drbd_conf *mdev,
715 				     union drbd_state ns, union drbd_state os)
716 {
717 	int rv = SS_SUCCESS;
718 
719 	if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
720 	    os.conn > C_CONNECTED)
721 		rv = SS_RESYNC_RUNNING;
722 
723 	if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
724 		rv = SS_ALREADY_STANDALONE;
725 
726 	if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
727 		rv = SS_IS_DISKLESS;
728 
729 	if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
730 		rv = SS_NO_NET_CONFIG;
731 
732 	if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
733 		rv = SS_LOWER_THAN_OUTDATED;
734 
735 	if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
736 		rv = SS_IN_TRANSIENT_STATE;
737 
738 	if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
739 		rv = SS_IN_TRANSIENT_STATE;
740 
741 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
742 		rv = SS_NEED_CONNECTION;
743 
744 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
745 	    ns.conn != os.conn && os.conn > C_CONNECTED)
746 		rv = SS_RESYNC_RUNNING;
747 
748 	if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
749 	    os.conn < C_CONNECTED)
750 		rv = SS_NEED_CONNECTION;
751 
752 	return rv;
753 }
754 
755 /**
756  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
757  * @mdev:	DRBD device.
758  * @os:		old state.
759  * @ns:		new state.
760  * @warn_sync_abort:
761  *
762  * When we loose connection, we have to set the state of the peers disk (pdsk)
763  * to D_UNKNOWN. This rule and many more along those lines are in this function.
764  */
765 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
766 				       union drbd_state ns, int *warn_sync_abort)
767 {
768 	enum drbd_fencing_p fp;
769 
770 	fp = FP_DONT_CARE;
771 	if (get_ldev(mdev)) {
772 		fp = mdev->ldev->dc.fencing;
773 		put_ldev(mdev);
774 	}
775 
776 	/* Disallow Network errors to configure a device's network part */
777 	if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
778 	    os.conn <= C_DISCONNECTING)
779 		ns.conn = os.conn;
780 
781 	/* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow */
782 	if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
783 	    ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING)
784 		ns.conn = os.conn;
785 
786 	/* After C_DISCONNECTING only C_STANDALONE may follow */
787 	if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
788 		ns.conn = os.conn;
789 
790 	if (ns.conn < C_CONNECTED) {
791 		ns.peer_isp = 0;
792 		ns.peer = R_UNKNOWN;
793 		if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
794 			ns.pdsk = D_UNKNOWN;
795 	}
796 
797 	/* Clear the aftr_isp when becoming unconfigured */
798 	if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
799 		ns.aftr_isp = 0;
800 
801 	if (ns.conn <= C_DISCONNECTING && ns.disk == D_DISKLESS)
802 		ns.pdsk = D_UNKNOWN;
803 
804 	/* Abort resync if a disk fails/detaches */
805 	if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
806 	    (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
807 		if (warn_sync_abort)
808 			*warn_sync_abort = 1;
809 		ns.conn = C_CONNECTED;
810 	}
811 
812 	if (ns.conn >= C_CONNECTED &&
813 	    ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
814 	     (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
815 		switch (ns.conn) {
816 		case C_WF_BITMAP_T:
817 		case C_PAUSED_SYNC_T:
818 			ns.disk = D_OUTDATED;
819 			break;
820 		case C_CONNECTED:
821 		case C_WF_BITMAP_S:
822 		case C_SYNC_SOURCE:
823 		case C_PAUSED_SYNC_S:
824 			ns.disk = D_UP_TO_DATE;
825 			break;
826 		case C_SYNC_TARGET:
827 			ns.disk = D_INCONSISTENT;
828 			dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
829 			break;
830 		}
831 		if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
832 			dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
833 	}
834 
835 	if (ns.conn >= C_CONNECTED &&
836 	    (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
837 		switch (ns.conn) {
838 		case C_CONNECTED:
839 		case C_WF_BITMAP_T:
840 		case C_PAUSED_SYNC_T:
841 		case C_SYNC_TARGET:
842 			ns.pdsk = D_UP_TO_DATE;
843 			break;
844 		case C_WF_BITMAP_S:
845 		case C_PAUSED_SYNC_S:
846 			/* remap any consistent state to D_OUTDATED,
847 			 * but disallow "upgrade" of not even consistent states.
848 			 */
849 			ns.pdsk =
850 				(D_DISKLESS < os.pdsk && os.pdsk < D_OUTDATED)
851 				? os.pdsk : D_OUTDATED;
852 			break;
853 		case C_SYNC_SOURCE:
854 			ns.pdsk = D_INCONSISTENT;
855 			dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
856 			break;
857 		}
858 		if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
859 			dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
860 	}
861 
862 	/* Connection breaks down before we finished "Negotiating" */
863 	if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
864 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
865 		if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
866 			ns.disk = mdev->new_state_tmp.disk;
867 			ns.pdsk = mdev->new_state_tmp.pdsk;
868 		} else {
869 			dev_alert(DEV, "Connection lost while negotiating, no data!\n");
870 			ns.disk = D_DISKLESS;
871 			ns.pdsk = D_UNKNOWN;
872 		}
873 		put_ldev(mdev);
874 	}
875 
876 	if (fp == FP_STONITH &&
877 	    (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
878 	    !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
879 		ns.susp = 1;
880 
881 	if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
882 		if (ns.conn == C_SYNC_SOURCE)
883 			ns.conn = C_PAUSED_SYNC_S;
884 		if (ns.conn == C_SYNC_TARGET)
885 			ns.conn = C_PAUSED_SYNC_T;
886 	} else {
887 		if (ns.conn == C_PAUSED_SYNC_S)
888 			ns.conn = C_SYNC_SOURCE;
889 		if (ns.conn == C_PAUSED_SYNC_T)
890 			ns.conn = C_SYNC_TARGET;
891 	}
892 
893 	return ns;
894 }
895 
896 /* helper for __drbd_set_state */
897 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
898 {
899 	if (cs == C_VERIFY_T) {
900 		/* starting online verify from an arbitrary position
901 		 * does not fit well into the existing protocol.
902 		 * on C_VERIFY_T, we initialize ov_left and friends
903 		 * implicitly in receive_DataRequest once the
904 		 * first P_OV_REQUEST is received */
905 		mdev->ov_start_sector = ~(sector_t)0;
906 	} else {
907 		unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
908 		if (bit >= mdev->rs_total)
909 			mdev->ov_start_sector =
910 				BM_BIT_TO_SECT(mdev->rs_total - 1);
911 		mdev->ov_position = mdev->ov_start_sector;
912 	}
913 }
914 
915 /**
916  * __drbd_set_state() - Set a new DRBD state
917  * @mdev:	DRBD device.
918  * @ns:		new state.
919  * @flags:	Flags
920  * @done:	Optional completion, that will get completed after the after_state_ch() finished
921  *
922  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
923  */
924 int __drbd_set_state(struct drbd_conf *mdev,
925 		    union drbd_state ns, enum chg_state_flags flags,
926 		    struct completion *done)
927 {
928 	union drbd_state os;
929 	int rv = SS_SUCCESS;
930 	int warn_sync_abort = 0;
931 	struct after_state_chg_work *ascw;
932 
933 	os = mdev->state;
934 
935 	ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
936 
937 	if (ns.i == os.i)
938 		return SS_NOTHING_TO_DO;
939 
940 	if (!(flags & CS_HARD)) {
941 		/*  pre-state-change checks ; only look at ns  */
942 		/* See drbd_state_sw_errors in drbd_strings.c */
943 
944 		rv = is_valid_state(mdev, ns);
945 		if (rv < SS_SUCCESS) {
946 			/* If the old state was illegal as well, then let
947 			   this happen...*/
948 
949 			if (is_valid_state(mdev, os) == rv) {
950 				dev_err(DEV, "Considering state change from bad state. "
951 				    "Error would be: '%s'\n",
952 				    drbd_set_st_err_str(rv));
953 				print_st(mdev, "old", os);
954 				print_st(mdev, "new", ns);
955 				rv = is_valid_state_transition(mdev, ns, os);
956 			}
957 		} else
958 			rv = is_valid_state_transition(mdev, ns, os);
959 	}
960 
961 	if (rv < SS_SUCCESS) {
962 		if (flags & CS_VERBOSE)
963 			print_st_err(mdev, os, ns, rv);
964 		return rv;
965 	}
966 
967 	if (warn_sync_abort)
968 		dev_warn(DEV, "Resync aborted.\n");
969 
970 	{
971 		char *pbp, pb[300];
972 		pbp = pb;
973 		*pbp = 0;
974 		PSC(role);
975 		PSC(peer);
976 		PSC(conn);
977 		PSC(disk);
978 		PSC(pdsk);
979 		PSC(susp);
980 		PSC(aftr_isp);
981 		PSC(peer_isp);
982 		PSC(user_isp);
983 		dev_info(DEV, "%s\n", pb);
984 	}
985 
986 	/* solve the race between becoming unconfigured,
987 	 * worker doing the cleanup, and
988 	 * admin reconfiguring us:
989 	 * on (re)configure, first set CONFIG_PENDING,
990 	 * then wait for a potentially exiting worker,
991 	 * start the worker, and schedule one no_op.
992 	 * then proceed with configuration.
993 	 */
994 	if (ns.disk == D_DISKLESS &&
995 	    ns.conn == C_STANDALONE &&
996 	    ns.role == R_SECONDARY &&
997 	    !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
998 		set_bit(DEVICE_DYING, &mdev->flags);
999 
1000 	mdev->state.i = ns.i;
1001 	wake_up(&mdev->misc_wait);
1002 	wake_up(&mdev->state_wait);
1003 
1004 	/*   post-state-change actions   */
1005 	if (os.conn >= C_SYNC_SOURCE   && ns.conn <= C_CONNECTED) {
1006 		set_bit(STOP_SYNC_TIMER, &mdev->flags);
1007 		mod_timer(&mdev->resync_timer, jiffies);
1008 	}
1009 
1010 	/* aborted verify run. log the last position */
1011 	if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1012 	    ns.conn < C_CONNECTED) {
1013 		mdev->ov_start_sector =
1014 			BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1015 		dev_info(DEV, "Online Verify reached sector %llu\n",
1016 			(unsigned long long)mdev->ov_start_sector);
1017 	}
1018 
1019 	if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1020 	    (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1021 		dev_info(DEV, "Syncer continues.\n");
1022 		mdev->rs_paused += (long)jiffies-(long)mdev->rs_mark_time;
1023 		if (ns.conn == C_SYNC_TARGET) {
1024 			if (!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))
1025 				mod_timer(&mdev->resync_timer, jiffies);
1026 			/* This if (!test_bit) is only needed for the case
1027 			   that a device that has ceased to used its timer,
1028 			   i.e. it is already in drbd_resync_finished() gets
1029 			   paused and resumed. */
1030 		}
1031 	}
1032 
1033 	if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1034 	    (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1035 		dev_info(DEV, "Resync suspended\n");
1036 		mdev->rs_mark_time = jiffies;
1037 		if (ns.conn == C_PAUSED_SYNC_T)
1038 			set_bit(STOP_SYNC_TIMER, &mdev->flags);
1039 	}
1040 
1041 	if (os.conn == C_CONNECTED &&
1042 	    (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1043 		mdev->ov_position = 0;
1044 		mdev->rs_total =
1045 		mdev->rs_mark_left = drbd_bm_bits(mdev);
1046 		if (mdev->agreed_pro_version >= 90)
1047 			set_ov_position(mdev, ns.conn);
1048 		else
1049 			mdev->ov_start_sector = 0;
1050 		mdev->ov_left = mdev->rs_total
1051 			      - BM_SECT_TO_BIT(mdev->ov_position);
1052 		mdev->rs_start     =
1053 		mdev->rs_mark_time = jiffies;
1054 		mdev->ov_last_oos_size = 0;
1055 		mdev->ov_last_oos_start = 0;
1056 
1057 		if (ns.conn == C_VERIFY_S) {
1058 			dev_info(DEV, "Starting Online Verify from sector %llu\n",
1059 					(unsigned long long)mdev->ov_position);
1060 			mod_timer(&mdev->resync_timer, jiffies);
1061 		}
1062 	}
1063 
1064 	if (get_ldev(mdev)) {
1065 		u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1066 						 MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1067 						 MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1068 
1069 		if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1070 			mdf |= MDF_CRASHED_PRIMARY;
1071 		if (mdev->state.role == R_PRIMARY ||
1072 		    (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1073 			mdf |= MDF_PRIMARY_IND;
1074 		if (mdev->state.conn > C_WF_REPORT_PARAMS)
1075 			mdf |= MDF_CONNECTED_IND;
1076 		if (mdev->state.disk > D_INCONSISTENT)
1077 			mdf |= MDF_CONSISTENT;
1078 		if (mdev->state.disk > D_OUTDATED)
1079 			mdf |= MDF_WAS_UP_TO_DATE;
1080 		if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1081 			mdf |= MDF_PEER_OUT_DATED;
1082 		if (mdf != mdev->ldev->md.flags) {
1083 			mdev->ldev->md.flags = mdf;
1084 			drbd_md_mark_dirty(mdev);
1085 		}
1086 		if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1087 			drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1088 		put_ldev(mdev);
1089 	}
1090 
1091 	/* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1092 	if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1093 	    os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1094 		set_bit(CONSIDER_RESYNC, &mdev->flags);
1095 
1096 	/* Receiver should clean up itself */
1097 	if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1098 		drbd_thread_stop_nowait(&mdev->receiver);
1099 
1100 	/* Now the receiver finished cleaning up itself, it should die */
1101 	if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1102 		drbd_thread_stop_nowait(&mdev->receiver);
1103 
1104 	/* Upon network failure, we need to restart the receiver. */
1105 	if (os.conn > C_TEAR_DOWN &&
1106 	    ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1107 		drbd_thread_restart_nowait(&mdev->receiver);
1108 
1109 	ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1110 	if (ascw) {
1111 		ascw->os = os;
1112 		ascw->ns = ns;
1113 		ascw->flags = flags;
1114 		ascw->w.cb = w_after_state_ch;
1115 		ascw->done = done;
1116 		drbd_queue_work(&mdev->data.work, &ascw->w);
1117 	} else {
1118 		dev_warn(DEV, "Could not kmalloc an ascw\n");
1119 	}
1120 
1121 	return rv;
1122 }
1123 
1124 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1125 {
1126 	struct after_state_chg_work *ascw =
1127 		container_of(w, struct after_state_chg_work, w);
1128 	after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1129 	if (ascw->flags & CS_WAIT_COMPLETE) {
1130 		D_ASSERT(ascw->done != NULL);
1131 		complete(ascw->done);
1132 	}
1133 	kfree(ascw);
1134 
1135 	return 1;
1136 }
1137 
1138 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1139 {
1140 	if (rv) {
1141 		dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1142 		_drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1143 		return;
1144 	}
1145 
1146 	switch (mdev->state.conn) {
1147 	case C_STARTING_SYNC_T:
1148 		_drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1149 		break;
1150 	case C_STARTING_SYNC_S:
1151 		drbd_start_resync(mdev, C_SYNC_SOURCE);
1152 		break;
1153 	}
1154 }
1155 
1156 /**
1157  * after_state_ch() - Perform after state change actions that may sleep
1158  * @mdev:	DRBD device.
1159  * @os:		old state.
1160  * @ns:		new state.
1161  * @flags:	Flags
1162  */
1163 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1164 			   union drbd_state ns, enum chg_state_flags flags)
1165 {
1166 	enum drbd_fencing_p fp;
1167 
1168 	if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1169 		clear_bit(CRASHED_PRIMARY, &mdev->flags);
1170 		if (mdev->p_uuid)
1171 			mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1172 	}
1173 
1174 	fp = FP_DONT_CARE;
1175 	if (get_ldev(mdev)) {
1176 		fp = mdev->ldev->dc.fencing;
1177 		put_ldev(mdev);
1178 	}
1179 
1180 	/* Inform userspace about the change... */
1181 	drbd_bcast_state(mdev, ns);
1182 
1183 	if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1184 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1185 		drbd_khelper(mdev, "pri-on-incon-degr");
1186 
1187 	/* Here we have the actions that are performed after a
1188 	   state change. This function might sleep */
1189 
1190 	if (fp == FP_STONITH && ns.susp) {
1191 		/* case1: The outdate peer handler is successful:
1192 		 * case2: The connection was established again: */
1193 		if ((os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) ||
1194 		    (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)) {
1195 			tl_clear(mdev);
1196 			spin_lock_irq(&mdev->req_lock);
1197 			_drbd_set_state(_NS(mdev, susp, 0), CS_VERBOSE, NULL);
1198 			spin_unlock_irq(&mdev->req_lock);
1199 		}
1200 	}
1201 	/* Do not change the order of the if above and the two below... */
1202 	if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1203 		drbd_send_uuids(mdev);
1204 		drbd_send_state(mdev);
1205 	}
1206 	if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1207 		drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1208 
1209 	/* Lost contact to peer's copy of the data */
1210 	if ((os.pdsk >= D_INCONSISTENT &&
1211 	     os.pdsk != D_UNKNOWN &&
1212 	     os.pdsk != D_OUTDATED)
1213 	&&  (ns.pdsk < D_INCONSISTENT ||
1214 	     ns.pdsk == D_UNKNOWN ||
1215 	     ns.pdsk == D_OUTDATED)) {
1216 		if (get_ldev(mdev)) {
1217 			if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1218 			    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1219 				drbd_uuid_new_current(mdev);
1220 				drbd_send_uuids(mdev);
1221 			}
1222 			put_ldev(mdev);
1223 		}
1224 	}
1225 
1226 	if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1227 		if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0)
1228 			drbd_uuid_new_current(mdev);
1229 
1230 		/* D_DISKLESS Peer becomes secondary */
1231 		if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1232 			drbd_al_to_on_disk_bm(mdev);
1233 		put_ldev(mdev);
1234 	}
1235 
1236 	/* Last part of the attaching process ... */
1237 	if (ns.conn >= C_CONNECTED &&
1238 	    os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1239 		drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1240 		drbd_send_uuids(mdev);
1241 		drbd_send_state(mdev);
1242 	}
1243 
1244 	/* We want to pause/continue resync, tell peer. */
1245 	if (ns.conn >= C_CONNECTED &&
1246 	     ((os.aftr_isp != ns.aftr_isp) ||
1247 	      (os.user_isp != ns.user_isp)))
1248 		drbd_send_state(mdev);
1249 
1250 	/* In case one of the isp bits got set, suspend other devices. */
1251 	if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1252 	    (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1253 		suspend_other_sg(mdev);
1254 
1255 	/* Make sure the peer gets informed about eventual state
1256 	   changes (ISP bits) while we were in WFReportParams. */
1257 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1258 		drbd_send_state(mdev);
1259 
1260 	/* We are in the progress to start a full sync... */
1261 	if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1262 	    (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1263 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1264 
1265 	/* We are invalidating our self... */
1266 	if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1267 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1268 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1269 
1270 	if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1271 		enum drbd_io_error_p eh;
1272 
1273 		eh = EP_PASS_ON;
1274 		if (get_ldev_if_state(mdev, D_FAILED)) {
1275 			eh = mdev->ldev->dc.on_io_error;
1276 			put_ldev(mdev);
1277 		}
1278 
1279 		drbd_rs_cancel_all(mdev);
1280 		/* since get_ldev() only works as long as disk>=D_INCONSISTENT,
1281 		   and it is D_DISKLESS here, local_cnt can only go down, it can
1282 		   not increase... It will reach zero */
1283 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1284 		mdev->rs_total = 0;
1285 		mdev->rs_failed = 0;
1286 		atomic_set(&mdev->rs_pending_cnt, 0);
1287 
1288 		spin_lock_irq(&mdev->req_lock);
1289 		_drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
1290 		spin_unlock_irq(&mdev->req_lock);
1291 
1292 		if (eh == EP_CALL_HELPER)
1293 			drbd_khelper(mdev, "local-io-error");
1294 	}
1295 
1296 	if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1297 
1298 		if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
1299 			if (drbd_send_state(mdev))
1300 				dev_warn(DEV, "Notified peer that my disk is broken.\n");
1301 			else
1302 				dev_err(DEV, "Sending state in drbd_io_error() failed\n");
1303 		}
1304 
1305 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1306 		lc_destroy(mdev->resync);
1307 		mdev->resync = NULL;
1308 		lc_destroy(mdev->act_log);
1309 		mdev->act_log = NULL;
1310 		__no_warn(local,
1311 			drbd_free_bc(mdev->ldev);
1312 			mdev->ldev = NULL;);
1313 
1314 		if (mdev->md_io_tmpp)
1315 			__free_page(mdev->md_io_tmpp);
1316 	}
1317 
1318 	/* Disks got bigger while they were detached */
1319 	if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1320 	    test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1321 		if (ns.conn == C_CONNECTED)
1322 			resync_after_online_grow(mdev);
1323 	}
1324 
1325 	/* A resync finished or aborted, wake paused devices... */
1326 	if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1327 	    (os.peer_isp && !ns.peer_isp) ||
1328 	    (os.user_isp && !ns.user_isp))
1329 		resume_next_sg(mdev);
1330 
1331 	/* Upon network connection, we need to start the receiver */
1332 	if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1333 		drbd_thread_start(&mdev->receiver);
1334 
1335 	/* Terminate worker thread if we are unconfigured - it will be
1336 	   restarted as needed... */
1337 	if (ns.disk == D_DISKLESS &&
1338 	    ns.conn == C_STANDALONE &&
1339 	    ns.role == R_SECONDARY) {
1340 		if (os.aftr_isp != ns.aftr_isp)
1341 			resume_next_sg(mdev);
1342 		/* set in __drbd_set_state, unless CONFIG_PENDING was set */
1343 		if (test_bit(DEVICE_DYING, &mdev->flags))
1344 			drbd_thread_stop_nowait(&mdev->worker);
1345 	}
1346 
1347 	drbd_md_sync(mdev);
1348 }
1349 
1350 
1351 static int drbd_thread_setup(void *arg)
1352 {
1353 	struct drbd_thread *thi = (struct drbd_thread *) arg;
1354 	struct drbd_conf *mdev = thi->mdev;
1355 	unsigned long flags;
1356 	int retval;
1357 
1358 restart:
1359 	retval = thi->function(thi);
1360 
1361 	spin_lock_irqsave(&thi->t_lock, flags);
1362 
1363 	/* if the receiver has been "Exiting", the last thing it did
1364 	 * was set the conn state to "StandAlone",
1365 	 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1366 	 * and receiver thread will be "started".
1367 	 * drbd_thread_start needs to set "Restarting" in that case.
1368 	 * t_state check and assignment needs to be within the same spinlock,
1369 	 * so either thread_start sees Exiting, and can remap to Restarting,
1370 	 * or thread_start see None, and can proceed as normal.
1371 	 */
1372 
1373 	if (thi->t_state == Restarting) {
1374 		dev_info(DEV, "Restarting %s\n", current->comm);
1375 		thi->t_state = Running;
1376 		spin_unlock_irqrestore(&thi->t_lock, flags);
1377 		goto restart;
1378 	}
1379 
1380 	thi->task = NULL;
1381 	thi->t_state = None;
1382 	smp_mb();
1383 	complete(&thi->stop);
1384 	spin_unlock_irqrestore(&thi->t_lock, flags);
1385 
1386 	dev_info(DEV, "Terminating %s\n", current->comm);
1387 
1388 	/* Release mod reference taken when thread was started */
1389 	module_put(THIS_MODULE);
1390 	return retval;
1391 }
1392 
1393 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1394 		      int (*func) (struct drbd_thread *))
1395 {
1396 	spin_lock_init(&thi->t_lock);
1397 	thi->task    = NULL;
1398 	thi->t_state = None;
1399 	thi->function = func;
1400 	thi->mdev = mdev;
1401 }
1402 
1403 int drbd_thread_start(struct drbd_thread *thi)
1404 {
1405 	struct drbd_conf *mdev = thi->mdev;
1406 	struct task_struct *nt;
1407 	unsigned long flags;
1408 
1409 	const char *me =
1410 		thi == &mdev->receiver ? "receiver" :
1411 		thi == &mdev->asender  ? "asender"  :
1412 		thi == &mdev->worker   ? "worker"   : "NONSENSE";
1413 
1414 	/* is used from state engine doing drbd_thread_stop_nowait,
1415 	 * while holding the req lock irqsave */
1416 	spin_lock_irqsave(&thi->t_lock, flags);
1417 
1418 	switch (thi->t_state) {
1419 	case None:
1420 		dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1421 				me, current->comm, current->pid);
1422 
1423 		/* Get ref on module for thread - this is released when thread exits */
1424 		if (!try_module_get(THIS_MODULE)) {
1425 			dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1426 			spin_unlock_irqrestore(&thi->t_lock, flags);
1427 			return FALSE;
1428 		}
1429 
1430 		init_completion(&thi->stop);
1431 		D_ASSERT(thi->task == NULL);
1432 		thi->reset_cpu_mask = 1;
1433 		thi->t_state = Running;
1434 		spin_unlock_irqrestore(&thi->t_lock, flags);
1435 		flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1436 
1437 		nt = kthread_create(drbd_thread_setup, (void *) thi,
1438 				    "drbd%d_%s", mdev_to_minor(mdev), me);
1439 
1440 		if (IS_ERR(nt)) {
1441 			dev_err(DEV, "Couldn't start thread\n");
1442 
1443 			module_put(THIS_MODULE);
1444 			return FALSE;
1445 		}
1446 		spin_lock_irqsave(&thi->t_lock, flags);
1447 		thi->task = nt;
1448 		thi->t_state = Running;
1449 		spin_unlock_irqrestore(&thi->t_lock, flags);
1450 		wake_up_process(nt);
1451 		break;
1452 	case Exiting:
1453 		thi->t_state = Restarting;
1454 		dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1455 				me, current->comm, current->pid);
1456 		/* fall through */
1457 	case Running:
1458 	case Restarting:
1459 	default:
1460 		spin_unlock_irqrestore(&thi->t_lock, flags);
1461 		break;
1462 	}
1463 
1464 	return TRUE;
1465 }
1466 
1467 
1468 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1469 {
1470 	unsigned long flags;
1471 
1472 	enum drbd_thread_state ns = restart ? Restarting : Exiting;
1473 
1474 	/* may be called from state engine, holding the req lock irqsave */
1475 	spin_lock_irqsave(&thi->t_lock, flags);
1476 
1477 	if (thi->t_state == None) {
1478 		spin_unlock_irqrestore(&thi->t_lock, flags);
1479 		if (restart)
1480 			drbd_thread_start(thi);
1481 		return;
1482 	}
1483 
1484 	if (thi->t_state != ns) {
1485 		if (thi->task == NULL) {
1486 			spin_unlock_irqrestore(&thi->t_lock, flags);
1487 			return;
1488 		}
1489 
1490 		thi->t_state = ns;
1491 		smp_mb();
1492 		init_completion(&thi->stop);
1493 		if (thi->task != current)
1494 			force_sig(DRBD_SIGKILL, thi->task);
1495 
1496 	}
1497 
1498 	spin_unlock_irqrestore(&thi->t_lock, flags);
1499 
1500 	if (wait)
1501 		wait_for_completion(&thi->stop);
1502 }
1503 
1504 #ifdef CONFIG_SMP
1505 /**
1506  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1507  * @mdev:	DRBD device.
1508  *
1509  * Forces all threads of a device onto the same CPU. This is beneficial for
1510  * DRBD's performance. May be overwritten by user's configuration.
1511  */
1512 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1513 {
1514 	int ord, cpu;
1515 
1516 	/* user override. */
1517 	if (cpumask_weight(mdev->cpu_mask))
1518 		return;
1519 
1520 	ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1521 	for_each_online_cpu(cpu) {
1522 		if (ord-- == 0) {
1523 			cpumask_set_cpu(cpu, mdev->cpu_mask);
1524 			return;
1525 		}
1526 	}
1527 	/* should not be reached */
1528 	cpumask_setall(mdev->cpu_mask);
1529 }
1530 
1531 /**
1532  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1533  * @mdev:	DRBD device.
1534  *
1535  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1536  * prematurely.
1537  */
1538 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1539 {
1540 	struct task_struct *p = current;
1541 	struct drbd_thread *thi =
1542 		p == mdev->asender.task  ? &mdev->asender  :
1543 		p == mdev->receiver.task ? &mdev->receiver :
1544 		p == mdev->worker.task   ? &mdev->worker   :
1545 		NULL;
1546 	ERR_IF(thi == NULL)
1547 		return;
1548 	if (!thi->reset_cpu_mask)
1549 		return;
1550 	thi->reset_cpu_mask = 0;
1551 	set_cpus_allowed_ptr(p, mdev->cpu_mask);
1552 }
1553 #endif
1554 
1555 /* the appropriate socket mutex must be held already */
1556 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1557 			  enum drbd_packets cmd, struct p_header *h,
1558 			  size_t size, unsigned msg_flags)
1559 {
1560 	int sent, ok;
1561 
1562 	ERR_IF(!h) return FALSE;
1563 	ERR_IF(!size) return FALSE;
1564 
1565 	h->magic   = BE_DRBD_MAGIC;
1566 	h->command = cpu_to_be16(cmd);
1567 	h->length  = cpu_to_be16(size-sizeof(struct p_header));
1568 
1569 	sent = drbd_send(mdev, sock, h, size, msg_flags);
1570 
1571 	ok = (sent == size);
1572 	if (!ok)
1573 		dev_err(DEV, "short sent %s size=%d sent=%d\n",
1574 		    cmdname(cmd), (int)size, sent);
1575 	return ok;
1576 }
1577 
1578 /* don't pass the socket. we may only look at it
1579  * when we hold the appropriate socket mutex.
1580  */
1581 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1582 		  enum drbd_packets cmd, struct p_header *h, size_t size)
1583 {
1584 	int ok = 0;
1585 	struct socket *sock;
1586 
1587 	if (use_data_socket) {
1588 		mutex_lock(&mdev->data.mutex);
1589 		sock = mdev->data.socket;
1590 	} else {
1591 		mutex_lock(&mdev->meta.mutex);
1592 		sock = mdev->meta.socket;
1593 	}
1594 
1595 	/* drbd_disconnect() could have called drbd_free_sock()
1596 	 * while we were waiting in down()... */
1597 	if (likely(sock != NULL))
1598 		ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1599 
1600 	if (use_data_socket)
1601 		mutex_unlock(&mdev->data.mutex);
1602 	else
1603 		mutex_unlock(&mdev->meta.mutex);
1604 	return ok;
1605 }
1606 
1607 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1608 		   size_t size)
1609 {
1610 	struct p_header h;
1611 	int ok;
1612 
1613 	h.magic   = BE_DRBD_MAGIC;
1614 	h.command = cpu_to_be16(cmd);
1615 	h.length  = cpu_to_be16(size);
1616 
1617 	if (!drbd_get_data_sock(mdev))
1618 		return 0;
1619 
1620 	ok = (sizeof(h) ==
1621 		drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1622 	ok = ok && (size ==
1623 		drbd_send(mdev, mdev->data.socket, data, size, 0));
1624 
1625 	drbd_put_data_sock(mdev);
1626 
1627 	return ok;
1628 }
1629 
1630 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1631 {
1632 	struct p_rs_param_89 *p;
1633 	struct socket *sock;
1634 	int size, rv;
1635 	const int apv = mdev->agreed_pro_version;
1636 
1637 	size = apv <= 87 ? sizeof(struct p_rs_param)
1638 		: apv == 88 ? sizeof(struct p_rs_param)
1639 			+ strlen(mdev->sync_conf.verify_alg) + 1
1640 		: /* 89 */    sizeof(struct p_rs_param_89);
1641 
1642 	/* used from admin command context and receiver/worker context.
1643 	 * to avoid kmalloc, grab the socket right here,
1644 	 * then use the pre-allocated sbuf there */
1645 	mutex_lock(&mdev->data.mutex);
1646 	sock = mdev->data.socket;
1647 
1648 	if (likely(sock != NULL)) {
1649 		enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1650 
1651 		p = &mdev->data.sbuf.rs_param_89;
1652 
1653 		/* initialize verify_alg and csums_alg */
1654 		memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1655 
1656 		p->rate = cpu_to_be32(sc->rate);
1657 
1658 		if (apv >= 88)
1659 			strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1660 		if (apv >= 89)
1661 			strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1662 
1663 		rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1664 	} else
1665 		rv = 0; /* not ok */
1666 
1667 	mutex_unlock(&mdev->data.mutex);
1668 
1669 	return rv;
1670 }
1671 
1672 int drbd_send_protocol(struct drbd_conf *mdev)
1673 {
1674 	struct p_protocol *p;
1675 	int size, cf, rv;
1676 
1677 	size = sizeof(struct p_protocol);
1678 
1679 	if (mdev->agreed_pro_version >= 87)
1680 		size += strlen(mdev->net_conf->integrity_alg) + 1;
1681 
1682 	/* we must not recurse into our own queue,
1683 	 * as that is blocked during handshake */
1684 	p = kmalloc(size, GFP_NOIO);
1685 	if (p == NULL)
1686 		return 0;
1687 
1688 	p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1689 	p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1690 	p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1691 	p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1692 	p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1693 
1694 	cf = 0;
1695 	if (mdev->net_conf->want_lose)
1696 		cf |= CF_WANT_LOSE;
1697 	if (mdev->net_conf->dry_run) {
1698 		if (mdev->agreed_pro_version >= 92)
1699 			cf |= CF_DRY_RUN;
1700 		else {
1701 			dev_err(DEV, "--dry-run is not supported by peer");
1702 			kfree(p);
1703 			return 0;
1704 		}
1705 	}
1706 	p->conn_flags    = cpu_to_be32(cf);
1707 
1708 	if (mdev->agreed_pro_version >= 87)
1709 		strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1710 
1711 	rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1712 			   (struct p_header *)p, size);
1713 	kfree(p);
1714 	return rv;
1715 }
1716 
1717 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1718 {
1719 	struct p_uuids p;
1720 	int i;
1721 
1722 	if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1723 		return 1;
1724 
1725 	for (i = UI_CURRENT; i < UI_SIZE; i++)
1726 		p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1727 
1728 	mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1729 	p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1730 	uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1731 	uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1732 	uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1733 	p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1734 
1735 	put_ldev(mdev);
1736 
1737 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1738 			     (struct p_header *)&p, sizeof(p));
1739 }
1740 
1741 int drbd_send_uuids(struct drbd_conf *mdev)
1742 {
1743 	return _drbd_send_uuids(mdev, 0);
1744 }
1745 
1746 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1747 {
1748 	return _drbd_send_uuids(mdev, 8);
1749 }
1750 
1751 
1752 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1753 {
1754 	struct p_rs_uuid p;
1755 
1756 	p.uuid = cpu_to_be64(val);
1757 
1758 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1759 			     (struct p_header *)&p, sizeof(p));
1760 }
1761 
1762 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1763 {
1764 	struct p_sizes p;
1765 	sector_t d_size, u_size;
1766 	int q_order_type;
1767 	int ok;
1768 
1769 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1770 		D_ASSERT(mdev->ldev->backing_bdev);
1771 		d_size = drbd_get_max_capacity(mdev->ldev);
1772 		u_size = mdev->ldev->dc.disk_size;
1773 		q_order_type = drbd_queue_order_type(mdev);
1774 		put_ldev(mdev);
1775 	} else {
1776 		d_size = 0;
1777 		u_size = 0;
1778 		q_order_type = QUEUE_ORDERED_NONE;
1779 	}
1780 
1781 	p.d_size = cpu_to_be64(d_size);
1782 	p.u_size = cpu_to_be64(u_size);
1783 	p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1784 	p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1785 	p.queue_order_type = cpu_to_be16(q_order_type);
1786 	p.dds_flags = cpu_to_be16(flags);
1787 
1788 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1789 			   (struct p_header *)&p, sizeof(p));
1790 	return ok;
1791 }
1792 
1793 /**
1794  * drbd_send_state() - Sends the drbd state to the peer
1795  * @mdev:	DRBD device.
1796  */
1797 int drbd_send_state(struct drbd_conf *mdev)
1798 {
1799 	struct socket *sock;
1800 	struct p_state p;
1801 	int ok = 0;
1802 
1803 	/* Grab state lock so we wont send state if we're in the middle
1804 	 * of a cluster wide state change on another thread */
1805 	drbd_state_lock(mdev);
1806 
1807 	mutex_lock(&mdev->data.mutex);
1808 
1809 	p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1810 	sock = mdev->data.socket;
1811 
1812 	if (likely(sock != NULL)) {
1813 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
1814 				    (struct p_header *)&p, sizeof(p), 0);
1815 	}
1816 
1817 	mutex_unlock(&mdev->data.mutex);
1818 
1819 	drbd_state_unlock(mdev);
1820 	return ok;
1821 }
1822 
1823 int drbd_send_state_req(struct drbd_conf *mdev,
1824 	union drbd_state mask, union drbd_state val)
1825 {
1826 	struct p_req_state p;
1827 
1828 	p.mask    = cpu_to_be32(mask.i);
1829 	p.val     = cpu_to_be32(val.i);
1830 
1831 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1832 			     (struct p_header *)&p, sizeof(p));
1833 }
1834 
1835 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1836 {
1837 	struct p_req_state_reply p;
1838 
1839 	p.retcode    = cpu_to_be32(retcode);
1840 
1841 	return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1842 			     (struct p_header *)&p, sizeof(p));
1843 }
1844 
1845 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1846 	struct p_compressed_bm *p,
1847 	struct bm_xfer_ctx *c)
1848 {
1849 	struct bitstream bs;
1850 	unsigned long plain_bits;
1851 	unsigned long tmp;
1852 	unsigned long rl;
1853 	unsigned len;
1854 	unsigned toggle;
1855 	int bits;
1856 
1857 	/* may we use this feature? */
1858 	if ((mdev->sync_conf.use_rle == 0) ||
1859 		(mdev->agreed_pro_version < 90))
1860 			return 0;
1861 
1862 	if (c->bit_offset >= c->bm_bits)
1863 		return 0; /* nothing to do. */
1864 
1865 	/* use at most thus many bytes */
1866 	bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1867 	memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1868 	/* plain bits covered in this code string */
1869 	plain_bits = 0;
1870 
1871 	/* p->encoding & 0x80 stores whether the first run length is set.
1872 	 * bit offset is implicit.
1873 	 * start with toggle == 2 to be able to tell the first iteration */
1874 	toggle = 2;
1875 
1876 	/* see how much plain bits we can stuff into one packet
1877 	 * using RLE and VLI. */
1878 	do {
1879 		tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1880 				    : _drbd_bm_find_next(mdev, c->bit_offset);
1881 		if (tmp == -1UL)
1882 			tmp = c->bm_bits;
1883 		rl = tmp - c->bit_offset;
1884 
1885 		if (toggle == 2) { /* first iteration */
1886 			if (rl == 0) {
1887 				/* the first checked bit was set,
1888 				 * store start value, */
1889 				DCBP_set_start(p, 1);
1890 				/* but skip encoding of zero run length */
1891 				toggle = !toggle;
1892 				continue;
1893 			}
1894 			DCBP_set_start(p, 0);
1895 		}
1896 
1897 		/* paranoia: catch zero runlength.
1898 		 * can only happen if bitmap is modified while we scan it. */
1899 		if (rl == 0) {
1900 			dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1901 			    "t:%u bo:%lu\n", toggle, c->bit_offset);
1902 			return -1;
1903 		}
1904 
1905 		bits = vli_encode_bits(&bs, rl);
1906 		if (bits == -ENOBUFS) /* buffer full */
1907 			break;
1908 		if (bits <= 0) {
1909 			dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1910 			return 0;
1911 		}
1912 
1913 		toggle = !toggle;
1914 		plain_bits += rl;
1915 		c->bit_offset = tmp;
1916 	} while (c->bit_offset < c->bm_bits);
1917 
1918 	len = bs.cur.b - p->code + !!bs.cur.bit;
1919 
1920 	if (plain_bits < (len << 3)) {
1921 		/* incompressible with this method.
1922 		 * we need to rewind both word and bit position. */
1923 		c->bit_offset -= plain_bits;
1924 		bm_xfer_ctx_bit_to_word_offset(c);
1925 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1926 		return 0;
1927 	}
1928 
1929 	/* RLE + VLI was able to compress it just fine.
1930 	 * update c->word_offset. */
1931 	bm_xfer_ctx_bit_to_word_offset(c);
1932 
1933 	/* store pad_bits */
1934 	DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1935 
1936 	return len;
1937 }
1938 
1939 enum { OK, FAILED, DONE }
1940 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1941 	struct p_header *h, struct bm_xfer_ctx *c)
1942 {
1943 	struct p_compressed_bm *p = (void*)h;
1944 	unsigned long num_words;
1945 	int len;
1946 	int ok;
1947 
1948 	len = fill_bitmap_rle_bits(mdev, p, c);
1949 
1950 	if (len < 0)
1951 		return FAILED;
1952 
1953 	if (len) {
1954 		DCBP_set_code(p, RLE_VLI_Bits);
1955 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
1956 			sizeof(*p) + len, 0);
1957 
1958 		c->packets[0]++;
1959 		c->bytes[0] += sizeof(*p) + len;
1960 
1961 		if (c->bit_offset >= c->bm_bits)
1962 			len = 0; /* DONE */
1963 	} else {
1964 		/* was not compressible.
1965 		 * send a buffer full of plain text bits instead. */
1966 		num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1967 		len = num_words * sizeof(long);
1968 		if (len)
1969 			drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1970 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
1971 				   h, sizeof(struct p_header) + len, 0);
1972 		c->word_offset += num_words;
1973 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1974 
1975 		c->packets[1]++;
1976 		c->bytes[1] += sizeof(struct p_header) + len;
1977 
1978 		if (c->bit_offset > c->bm_bits)
1979 			c->bit_offset = c->bm_bits;
1980 	}
1981 	ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
1982 
1983 	if (ok == DONE)
1984 		INFO_bm_xfer_stats(mdev, "send", c);
1985 	return ok;
1986 }
1987 
1988 /* See the comment at receive_bitmap() */
1989 int _drbd_send_bitmap(struct drbd_conf *mdev)
1990 {
1991 	struct bm_xfer_ctx c;
1992 	struct p_header *p;
1993 	int ret;
1994 
1995 	ERR_IF(!mdev->bitmap) return FALSE;
1996 
1997 	/* maybe we should use some per thread scratch page,
1998 	 * and allocate that during initial device creation? */
1999 	p = (struct p_header *) __get_free_page(GFP_NOIO);
2000 	if (!p) {
2001 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2002 		return FALSE;
2003 	}
2004 
2005 	if (get_ldev(mdev)) {
2006 		if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2007 			dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2008 			drbd_bm_set_all(mdev);
2009 			if (drbd_bm_write(mdev)) {
2010 				/* write_bm did fail! Leave full sync flag set in Meta P_DATA
2011 				 * but otherwise process as per normal - need to tell other
2012 				 * side that a full resync is required! */
2013 				dev_err(DEV, "Failed to write bitmap to disk!\n");
2014 			} else {
2015 				drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2016 				drbd_md_sync(mdev);
2017 			}
2018 		}
2019 		put_ldev(mdev);
2020 	}
2021 
2022 	c = (struct bm_xfer_ctx) {
2023 		.bm_bits = drbd_bm_bits(mdev),
2024 		.bm_words = drbd_bm_words(mdev),
2025 	};
2026 
2027 	do {
2028 		ret = send_bitmap_rle_or_plain(mdev, p, &c);
2029 	} while (ret == OK);
2030 
2031 	free_page((unsigned long) p);
2032 	return (ret == DONE);
2033 }
2034 
2035 int drbd_send_bitmap(struct drbd_conf *mdev)
2036 {
2037 	int err;
2038 
2039 	if (!drbd_get_data_sock(mdev))
2040 		return -1;
2041 	err = !_drbd_send_bitmap(mdev);
2042 	drbd_put_data_sock(mdev);
2043 	return err;
2044 }
2045 
2046 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2047 {
2048 	int ok;
2049 	struct p_barrier_ack p;
2050 
2051 	p.barrier  = barrier_nr;
2052 	p.set_size = cpu_to_be32(set_size);
2053 
2054 	if (mdev->state.conn < C_CONNECTED)
2055 		return FALSE;
2056 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2057 			(struct p_header *)&p, sizeof(p));
2058 	return ok;
2059 }
2060 
2061 /**
2062  * _drbd_send_ack() - Sends an ack packet
2063  * @mdev:	DRBD device.
2064  * @cmd:	Packet command code.
2065  * @sector:	sector, needs to be in big endian byte order
2066  * @blksize:	size in byte, needs to be in big endian byte order
2067  * @block_id:	Id, big endian byte order
2068  */
2069 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2070 			  u64 sector,
2071 			  u32 blksize,
2072 			  u64 block_id)
2073 {
2074 	int ok;
2075 	struct p_block_ack p;
2076 
2077 	p.sector   = sector;
2078 	p.block_id = block_id;
2079 	p.blksize  = blksize;
2080 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2081 
2082 	if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2083 		return FALSE;
2084 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2085 				(struct p_header *)&p, sizeof(p));
2086 	return ok;
2087 }
2088 
2089 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2090 		     struct p_data *dp)
2091 {
2092 	const int header_size = sizeof(struct p_data)
2093 			      - sizeof(struct p_header);
2094 	int data_size  = ((struct p_header *)dp)->length - header_size;
2095 
2096 	return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2097 			      dp->block_id);
2098 }
2099 
2100 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2101 		     struct p_block_req *rp)
2102 {
2103 	return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2104 }
2105 
2106 /**
2107  * drbd_send_ack() - Sends an ack packet
2108  * @mdev:	DRBD device.
2109  * @cmd:	Packet command code.
2110  * @e:		Epoch entry.
2111  */
2112 int drbd_send_ack(struct drbd_conf *mdev,
2113 	enum drbd_packets cmd, struct drbd_epoch_entry *e)
2114 {
2115 	return _drbd_send_ack(mdev, cmd,
2116 			      cpu_to_be64(e->sector),
2117 			      cpu_to_be32(e->size),
2118 			      e->block_id);
2119 }
2120 
2121 /* This function misuses the block_id field to signal if the blocks
2122  * are is sync or not. */
2123 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2124 		     sector_t sector, int blksize, u64 block_id)
2125 {
2126 	return _drbd_send_ack(mdev, cmd,
2127 			      cpu_to_be64(sector),
2128 			      cpu_to_be32(blksize),
2129 			      cpu_to_be64(block_id));
2130 }
2131 
2132 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2133 		       sector_t sector, int size, u64 block_id)
2134 {
2135 	int ok;
2136 	struct p_block_req p;
2137 
2138 	p.sector   = cpu_to_be64(sector);
2139 	p.block_id = block_id;
2140 	p.blksize  = cpu_to_be32(size);
2141 
2142 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2143 				(struct p_header *)&p, sizeof(p));
2144 	return ok;
2145 }
2146 
2147 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2148 			    sector_t sector, int size,
2149 			    void *digest, int digest_size,
2150 			    enum drbd_packets cmd)
2151 {
2152 	int ok;
2153 	struct p_block_req p;
2154 
2155 	p.sector   = cpu_to_be64(sector);
2156 	p.block_id = BE_DRBD_MAGIC + 0xbeef;
2157 	p.blksize  = cpu_to_be32(size);
2158 
2159 	p.head.magic   = BE_DRBD_MAGIC;
2160 	p.head.command = cpu_to_be16(cmd);
2161 	p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header) + digest_size);
2162 
2163 	mutex_lock(&mdev->data.mutex);
2164 
2165 	ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2166 	ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2167 
2168 	mutex_unlock(&mdev->data.mutex);
2169 
2170 	return ok;
2171 }
2172 
2173 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2174 {
2175 	int ok;
2176 	struct p_block_req p;
2177 
2178 	p.sector   = cpu_to_be64(sector);
2179 	p.block_id = BE_DRBD_MAGIC + 0xbabe;
2180 	p.blksize  = cpu_to_be32(size);
2181 
2182 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2183 			   (struct p_header *)&p, sizeof(p));
2184 	return ok;
2185 }
2186 
2187 /* called on sndtimeo
2188  * returns FALSE if we should retry,
2189  * TRUE if we think connection is dead
2190  */
2191 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2192 {
2193 	int drop_it;
2194 	/* long elapsed = (long)(jiffies - mdev->last_received); */
2195 
2196 	drop_it =   mdev->meta.socket == sock
2197 		|| !mdev->asender.task
2198 		|| get_t_state(&mdev->asender) != Running
2199 		|| mdev->state.conn < C_CONNECTED;
2200 
2201 	if (drop_it)
2202 		return TRUE;
2203 
2204 	drop_it = !--mdev->ko_count;
2205 	if (!drop_it) {
2206 		dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2207 		       current->comm, current->pid, mdev->ko_count);
2208 		request_ping(mdev);
2209 	}
2210 
2211 	return drop_it; /* && (mdev->state == R_PRIMARY) */;
2212 }
2213 
2214 /* The idea of sendpage seems to be to put some kind of reference
2215  * to the page into the skb, and to hand it over to the NIC. In
2216  * this process get_page() gets called.
2217  *
2218  * As soon as the page was really sent over the network put_page()
2219  * gets called by some part of the network layer. [ NIC driver? ]
2220  *
2221  * [ get_page() / put_page() increment/decrement the count. If count
2222  *   reaches 0 the page will be freed. ]
2223  *
2224  * This works nicely with pages from FSs.
2225  * But this means that in protocol A we might signal IO completion too early!
2226  *
2227  * In order not to corrupt data during a resync we must make sure
2228  * that we do not reuse our own buffer pages (EEs) to early, therefore
2229  * we have the net_ee list.
2230  *
2231  * XFS seems to have problems, still, it submits pages with page_count == 0!
2232  * As a workaround, we disable sendpage on pages
2233  * with page_count == 0 or PageSlab.
2234  */
2235 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2236 		   int offset, size_t size, unsigned msg_flags)
2237 {
2238 	int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2239 	kunmap(page);
2240 	if (sent == size)
2241 		mdev->send_cnt += size>>9;
2242 	return sent == size;
2243 }
2244 
2245 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2246 		    int offset, size_t size, unsigned msg_flags)
2247 {
2248 	mm_segment_t oldfs = get_fs();
2249 	int sent, ok;
2250 	int len = size;
2251 
2252 	/* e.g. XFS meta- & log-data is in slab pages, which have a
2253 	 * page_count of 0 and/or have PageSlab() set.
2254 	 * we cannot use send_page for those, as that does get_page();
2255 	 * put_page(); and would cause either a VM_BUG directly, or
2256 	 * __page_cache_release a page that would actually still be referenced
2257 	 * by someone, leading to some obscure delayed Oops somewhere else. */
2258 	if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2259 		return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2260 
2261 	msg_flags |= MSG_NOSIGNAL;
2262 	drbd_update_congested(mdev);
2263 	set_fs(KERNEL_DS);
2264 	do {
2265 		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2266 							offset, len,
2267 							msg_flags);
2268 		if (sent == -EAGAIN) {
2269 			if (we_should_drop_the_connection(mdev,
2270 							  mdev->data.socket))
2271 				break;
2272 			else
2273 				continue;
2274 		}
2275 		if (sent <= 0) {
2276 			dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2277 			     __func__, (int)size, len, sent);
2278 			break;
2279 		}
2280 		len    -= sent;
2281 		offset += sent;
2282 	} while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2283 	set_fs(oldfs);
2284 	clear_bit(NET_CONGESTED, &mdev->flags);
2285 
2286 	ok = (len == 0);
2287 	if (likely(ok))
2288 		mdev->send_cnt += size>>9;
2289 	return ok;
2290 }
2291 
2292 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2293 {
2294 	struct bio_vec *bvec;
2295 	int i;
2296 	/* hint all but last page with MSG_MORE */
2297 	__bio_for_each_segment(bvec, bio, i, 0) {
2298 		if (!_drbd_no_send_page(mdev, bvec->bv_page,
2299 				     bvec->bv_offset, bvec->bv_len,
2300 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2301 			return 0;
2302 	}
2303 	return 1;
2304 }
2305 
2306 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2307 {
2308 	struct bio_vec *bvec;
2309 	int i;
2310 	/* hint all but last page with MSG_MORE */
2311 	__bio_for_each_segment(bvec, bio, i, 0) {
2312 		if (!_drbd_send_page(mdev, bvec->bv_page,
2313 				     bvec->bv_offset, bvec->bv_len,
2314 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2315 			return 0;
2316 	}
2317 	return 1;
2318 }
2319 
2320 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2321 {
2322 	struct page *page = e->pages;
2323 	unsigned len = e->size;
2324 	/* hint all but last page with MSG_MORE */
2325 	page_chain_for_each(page) {
2326 		unsigned l = min_t(unsigned, len, PAGE_SIZE);
2327 		if (!_drbd_send_page(mdev, page, 0, l,
2328 				page_chain_next(page) ? MSG_MORE : 0))
2329 			return 0;
2330 		len -= l;
2331 	}
2332 	return 1;
2333 }
2334 
2335 /* Used to send write requests
2336  * R_PRIMARY -> Peer	(P_DATA)
2337  */
2338 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2339 {
2340 	int ok = 1;
2341 	struct p_data p;
2342 	unsigned int dp_flags = 0;
2343 	void *dgb;
2344 	int dgs;
2345 
2346 	if (!drbd_get_data_sock(mdev))
2347 		return 0;
2348 
2349 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2350 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2351 
2352 	p.head.magic   = BE_DRBD_MAGIC;
2353 	p.head.command = cpu_to_be16(P_DATA);
2354 	p.head.length  =
2355 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + req->size);
2356 
2357 	p.sector   = cpu_to_be64(req->sector);
2358 	p.block_id = (unsigned long)req;
2359 	p.seq_num  = cpu_to_be32(req->seq_num =
2360 				 atomic_add_return(1, &mdev->packet_seq));
2361 	dp_flags = 0;
2362 
2363 	/* NOTE: no need to check if barriers supported here as we would
2364 	 *       not pass the test in make_request_common in that case
2365 	 */
2366 	if (req->master_bio->bi_rw & REQ_HARDBARRIER) {
2367 		dev_err(DEV, "ASSERT FAILED would have set DP_HARDBARRIER\n");
2368 		/* dp_flags |= DP_HARDBARRIER; */
2369 	}
2370 	if (req->master_bio->bi_rw & REQ_SYNC)
2371 		dp_flags |= DP_RW_SYNC;
2372 	/* for now handle SYNCIO and UNPLUG
2373 	 * as if they still were one and the same flag */
2374 	if (req->master_bio->bi_rw & REQ_UNPLUG)
2375 		dp_flags |= DP_RW_SYNC;
2376 	if (mdev->state.conn >= C_SYNC_SOURCE &&
2377 	    mdev->state.conn <= C_PAUSED_SYNC_T)
2378 		dp_flags |= DP_MAY_SET_IN_SYNC;
2379 
2380 	p.dp_flags = cpu_to_be32(dp_flags);
2381 	set_bit(UNPLUG_REMOTE, &mdev->flags);
2382 	ok = (sizeof(p) ==
2383 		drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2384 	if (ok && dgs) {
2385 		dgb = mdev->int_dig_out;
2386 		drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2387 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2388 	}
2389 	if (ok) {
2390 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2391 			ok = _drbd_send_bio(mdev, req->master_bio);
2392 		else
2393 			ok = _drbd_send_zc_bio(mdev, req->master_bio);
2394 	}
2395 
2396 	drbd_put_data_sock(mdev);
2397 
2398 	return ok;
2399 }
2400 
2401 /* answer packet, used to send data back for read requests:
2402  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2403  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2404  */
2405 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2406 		    struct drbd_epoch_entry *e)
2407 {
2408 	int ok;
2409 	struct p_data p;
2410 	void *dgb;
2411 	int dgs;
2412 
2413 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2414 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2415 
2416 	p.head.magic   = BE_DRBD_MAGIC;
2417 	p.head.command = cpu_to_be16(cmd);
2418 	p.head.length  =
2419 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + e->size);
2420 
2421 	p.sector   = cpu_to_be64(e->sector);
2422 	p.block_id = e->block_id;
2423 	/* p.seq_num  = 0;    No sequence numbers here.. */
2424 
2425 	/* Only called by our kernel thread.
2426 	 * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2427 	 * in response to admin command or module unload.
2428 	 */
2429 	if (!drbd_get_data_sock(mdev))
2430 		return 0;
2431 
2432 	ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p,
2433 					sizeof(p), dgs ? MSG_MORE : 0);
2434 	if (ok && dgs) {
2435 		dgb = mdev->int_dig_out;
2436 		drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2437 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2438 	}
2439 	if (ok)
2440 		ok = _drbd_send_zc_ee(mdev, e);
2441 
2442 	drbd_put_data_sock(mdev);
2443 
2444 	return ok;
2445 }
2446 
2447 /*
2448   drbd_send distinguishes two cases:
2449 
2450   Packets sent via the data socket "sock"
2451   and packets sent via the meta data socket "msock"
2452 
2453 		    sock                      msock
2454   -----------------+-------------------------+------------------------------
2455   timeout           conf.timeout / 2          conf.timeout / 2
2456   timeout action    send a ping via msock     Abort communication
2457 					      and close all sockets
2458 */
2459 
2460 /*
2461  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2462  */
2463 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2464 	      void *buf, size_t size, unsigned msg_flags)
2465 {
2466 	struct kvec iov;
2467 	struct msghdr msg;
2468 	int rv, sent = 0;
2469 
2470 	if (!sock)
2471 		return -1000;
2472 
2473 	/* THINK  if (signal_pending) return ... ? */
2474 
2475 	iov.iov_base = buf;
2476 	iov.iov_len  = size;
2477 
2478 	msg.msg_name       = NULL;
2479 	msg.msg_namelen    = 0;
2480 	msg.msg_control    = NULL;
2481 	msg.msg_controllen = 0;
2482 	msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2483 
2484 	if (sock == mdev->data.socket) {
2485 		mdev->ko_count = mdev->net_conf->ko_count;
2486 		drbd_update_congested(mdev);
2487 	}
2488 	do {
2489 		/* STRANGE
2490 		 * tcp_sendmsg does _not_ use its size parameter at all ?
2491 		 *
2492 		 * -EAGAIN on timeout, -EINTR on signal.
2493 		 */
2494 /* THINK
2495  * do we need to block DRBD_SIG if sock == &meta.socket ??
2496  * otherwise wake_asender() might interrupt some send_*Ack !
2497  */
2498 		rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2499 		if (rv == -EAGAIN) {
2500 			if (we_should_drop_the_connection(mdev, sock))
2501 				break;
2502 			else
2503 				continue;
2504 		}
2505 		D_ASSERT(rv != 0);
2506 		if (rv == -EINTR) {
2507 			flush_signals(current);
2508 			rv = 0;
2509 		}
2510 		if (rv < 0)
2511 			break;
2512 		sent += rv;
2513 		iov.iov_base += rv;
2514 		iov.iov_len  -= rv;
2515 	} while (sent < size);
2516 
2517 	if (sock == mdev->data.socket)
2518 		clear_bit(NET_CONGESTED, &mdev->flags);
2519 
2520 	if (rv <= 0) {
2521 		if (rv != -EAGAIN) {
2522 			dev_err(DEV, "%s_sendmsg returned %d\n",
2523 			    sock == mdev->meta.socket ? "msock" : "sock",
2524 			    rv);
2525 			drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2526 		} else
2527 			drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2528 	}
2529 
2530 	return sent;
2531 }
2532 
2533 static int drbd_open(struct block_device *bdev, fmode_t mode)
2534 {
2535 	struct drbd_conf *mdev = bdev->bd_disk->private_data;
2536 	unsigned long flags;
2537 	int rv = 0;
2538 
2539 	lock_kernel();
2540 	spin_lock_irqsave(&mdev->req_lock, flags);
2541 	/* to have a stable mdev->state.role
2542 	 * and no race with updating open_cnt */
2543 
2544 	if (mdev->state.role != R_PRIMARY) {
2545 		if (mode & FMODE_WRITE)
2546 			rv = -EROFS;
2547 		else if (!allow_oos)
2548 			rv = -EMEDIUMTYPE;
2549 	}
2550 
2551 	if (!rv)
2552 		mdev->open_cnt++;
2553 	spin_unlock_irqrestore(&mdev->req_lock, flags);
2554 	unlock_kernel();
2555 
2556 	return rv;
2557 }
2558 
2559 static int drbd_release(struct gendisk *gd, fmode_t mode)
2560 {
2561 	struct drbd_conf *mdev = gd->private_data;
2562 	lock_kernel();
2563 	mdev->open_cnt--;
2564 	unlock_kernel();
2565 	return 0;
2566 }
2567 
2568 static void drbd_unplug_fn(struct request_queue *q)
2569 {
2570 	struct drbd_conf *mdev = q->queuedata;
2571 
2572 	/* unplug FIRST */
2573 	spin_lock_irq(q->queue_lock);
2574 	blk_remove_plug(q);
2575 	spin_unlock_irq(q->queue_lock);
2576 
2577 	/* only if connected */
2578 	spin_lock_irq(&mdev->req_lock);
2579 	if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2580 		D_ASSERT(mdev->state.role == R_PRIMARY);
2581 		if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2582 			/* add to the data.work queue,
2583 			 * unless already queued.
2584 			 * XXX this might be a good addition to drbd_queue_work
2585 			 * anyways, to detect "double queuing" ... */
2586 			if (list_empty(&mdev->unplug_work.list))
2587 				drbd_queue_work(&mdev->data.work,
2588 						&mdev->unplug_work);
2589 		}
2590 	}
2591 	spin_unlock_irq(&mdev->req_lock);
2592 
2593 	if (mdev->state.disk >= D_INCONSISTENT)
2594 		drbd_kick_lo(mdev);
2595 }
2596 
2597 static void drbd_set_defaults(struct drbd_conf *mdev)
2598 {
2599 	/* This way we get a compile error when sync_conf grows,
2600 	   and we forgot to initialize it here */
2601 	mdev->sync_conf = (struct syncer_conf) {
2602 		/* .rate = */		DRBD_RATE_DEF,
2603 		/* .after = */		DRBD_AFTER_DEF,
2604 		/* .al_extents = */	DRBD_AL_EXTENTS_DEF,
2605 		/* .verify_alg = */	{}, 0,
2606 		/* .cpu_mask = */	{}, 0,
2607 		/* .csums_alg = */	{}, 0,
2608 		/* .use_rle = */	0
2609 	};
2610 
2611 	/* Have to use that way, because the layout differs between
2612 	   big endian and little endian */
2613 	mdev->state = (union drbd_state) {
2614 		{ .role = R_SECONDARY,
2615 		  .peer = R_UNKNOWN,
2616 		  .conn = C_STANDALONE,
2617 		  .disk = D_DISKLESS,
2618 		  .pdsk = D_UNKNOWN,
2619 		  .susp = 0
2620 		} };
2621 }
2622 
2623 void drbd_init_set_defaults(struct drbd_conf *mdev)
2624 {
2625 	/* the memset(,0,) did most of this.
2626 	 * note: only assignments, no allocation in here */
2627 
2628 	drbd_set_defaults(mdev);
2629 
2630 	/* for now, we do NOT yet support it,
2631 	 * even though we start some framework
2632 	 * to eventually support barriers */
2633 	set_bit(NO_BARRIER_SUPP, &mdev->flags);
2634 
2635 	atomic_set(&mdev->ap_bio_cnt, 0);
2636 	atomic_set(&mdev->ap_pending_cnt, 0);
2637 	atomic_set(&mdev->rs_pending_cnt, 0);
2638 	atomic_set(&mdev->unacked_cnt, 0);
2639 	atomic_set(&mdev->local_cnt, 0);
2640 	atomic_set(&mdev->net_cnt, 0);
2641 	atomic_set(&mdev->packet_seq, 0);
2642 	atomic_set(&mdev->pp_in_use, 0);
2643 
2644 	mutex_init(&mdev->md_io_mutex);
2645 	mutex_init(&mdev->data.mutex);
2646 	mutex_init(&mdev->meta.mutex);
2647 	sema_init(&mdev->data.work.s, 0);
2648 	sema_init(&mdev->meta.work.s, 0);
2649 	mutex_init(&mdev->state_mutex);
2650 
2651 	spin_lock_init(&mdev->data.work.q_lock);
2652 	spin_lock_init(&mdev->meta.work.q_lock);
2653 
2654 	spin_lock_init(&mdev->al_lock);
2655 	spin_lock_init(&mdev->req_lock);
2656 	spin_lock_init(&mdev->peer_seq_lock);
2657 	spin_lock_init(&mdev->epoch_lock);
2658 
2659 	INIT_LIST_HEAD(&mdev->active_ee);
2660 	INIT_LIST_HEAD(&mdev->sync_ee);
2661 	INIT_LIST_HEAD(&mdev->done_ee);
2662 	INIT_LIST_HEAD(&mdev->read_ee);
2663 	INIT_LIST_HEAD(&mdev->net_ee);
2664 	INIT_LIST_HEAD(&mdev->resync_reads);
2665 	INIT_LIST_HEAD(&mdev->data.work.q);
2666 	INIT_LIST_HEAD(&mdev->meta.work.q);
2667 	INIT_LIST_HEAD(&mdev->resync_work.list);
2668 	INIT_LIST_HEAD(&mdev->unplug_work.list);
2669 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
2670 	INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2671 
2672 	mdev->resync_work.cb  = w_resync_inactive;
2673 	mdev->unplug_work.cb  = w_send_write_hint;
2674 	mdev->md_sync_work.cb = w_md_sync;
2675 	mdev->bm_io_work.w.cb = w_bitmap_io;
2676 	init_timer(&mdev->resync_timer);
2677 	init_timer(&mdev->md_sync_timer);
2678 	mdev->resync_timer.function = resync_timer_fn;
2679 	mdev->resync_timer.data = (unsigned long) mdev;
2680 	mdev->md_sync_timer.function = md_sync_timer_fn;
2681 	mdev->md_sync_timer.data = (unsigned long) mdev;
2682 
2683 	init_waitqueue_head(&mdev->misc_wait);
2684 	init_waitqueue_head(&mdev->state_wait);
2685 	init_waitqueue_head(&mdev->ee_wait);
2686 	init_waitqueue_head(&mdev->al_wait);
2687 	init_waitqueue_head(&mdev->seq_wait);
2688 
2689 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2690 	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2691 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2692 
2693 	mdev->agreed_pro_version = PRO_VERSION_MAX;
2694 	mdev->write_ordering = WO_bio_barrier;
2695 	mdev->resync_wenr = LC_FREE;
2696 }
2697 
2698 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2699 {
2700 	if (mdev->receiver.t_state != None)
2701 		dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2702 				mdev->receiver.t_state);
2703 
2704 	/* no need to lock it, I'm the only thread alive */
2705 	if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2706 		dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2707 	mdev->al_writ_cnt  =
2708 	mdev->bm_writ_cnt  =
2709 	mdev->read_cnt     =
2710 	mdev->recv_cnt     =
2711 	mdev->send_cnt     =
2712 	mdev->writ_cnt     =
2713 	mdev->p_size       =
2714 	mdev->rs_start     =
2715 	mdev->rs_total     =
2716 	mdev->rs_failed    =
2717 	mdev->rs_mark_left =
2718 	mdev->rs_mark_time = 0;
2719 	D_ASSERT(mdev->net_conf == NULL);
2720 
2721 	drbd_set_my_capacity(mdev, 0);
2722 	if (mdev->bitmap) {
2723 		/* maybe never allocated. */
2724 		drbd_bm_resize(mdev, 0, 1);
2725 		drbd_bm_cleanup(mdev);
2726 	}
2727 
2728 	drbd_free_resources(mdev);
2729 
2730 	/*
2731 	 * currently we drbd_init_ee only on module load, so
2732 	 * we may do drbd_release_ee only on module unload!
2733 	 */
2734 	D_ASSERT(list_empty(&mdev->active_ee));
2735 	D_ASSERT(list_empty(&mdev->sync_ee));
2736 	D_ASSERT(list_empty(&mdev->done_ee));
2737 	D_ASSERT(list_empty(&mdev->read_ee));
2738 	D_ASSERT(list_empty(&mdev->net_ee));
2739 	D_ASSERT(list_empty(&mdev->resync_reads));
2740 	D_ASSERT(list_empty(&mdev->data.work.q));
2741 	D_ASSERT(list_empty(&mdev->meta.work.q));
2742 	D_ASSERT(list_empty(&mdev->resync_work.list));
2743 	D_ASSERT(list_empty(&mdev->unplug_work.list));
2744 
2745 }
2746 
2747 
2748 static void drbd_destroy_mempools(void)
2749 {
2750 	struct page *page;
2751 
2752 	while (drbd_pp_pool) {
2753 		page = drbd_pp_pool;
2754 		drbd_pp_pool = (struct page *)page_private(page);
2755 		__free_page(page);
2756 		drbd_pp_vacant--;
2757 	}
2758 
2759 	/* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2760 
2761 	if (drbd_ee_mempool)
2762 		mempool_destroy(drbd_ee_mempool);
2763 	if (drbd_request_mempool)
2764 		mempool_destroy(drbd_request_mempool);
2765 	if (drbd_ee_cache)
2766 		kmem_cache_destroy(drbd_ee_cache);
2767 	if (drbd_request_cache)
2768 		kmem_cache_destroy(drbd_request_cache);
2769 	if (drbd_bm_ext_cache)
2770 		kmem_cache_destroy(drbd_bm_ext_cache);
2771 	if (drbd_al_ext_cache)
2772 		kmem_cache_destroy(drbd_al_ext_cache);
2773 
2774 	drbd_ee_mempool      = NULL;
2775 	drbd_request_mempool = NULL;
2776 	drbd_ee_cache        = NULL;
2777 	drbd_request_cache   = NULL;
2778 	drbd_bm_ext_cache    = NULL;
2779 	drbd_al_ext_cache    = NULL;
2780 
2781 	return;
2782 }
2783 
2784 static int drbd_create_mempools(void)
2785 {
2786 	struct page *page;
2787 	const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2788 	int i;
2789 
2790 	/* prepare our caches and mempools */
2791 	drbd_request_mempool = NULL;
2792 	drbd_ee_cache        = NULL;
2793 	drbd_request_cache   = NULL;
2794 	drbd_bm_ext_cache    = NULL;
2795 	drbd_al_ext_cache    = NULL;
2796 	drbd_pp_pool         = NULL;
2797 
2798 	/* caches */
2799 	drbd_request_cache = kmem_cache_create(
2800 		"drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2801 	if (drbd_request_cache == NULL)
2802 		goto Enomem;
2803 
2804 	drbd_ee_cache = kmem_cache_create(
2805 		"drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2806 	if (drbd_ee_cache == NULL)
2807 		goto Enomem;
2808 
2809 	drbd_bm_ext_cache = kmem_cache_create(
2810 		"drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2811 	if (drbd_bm_ext_cache == NULL)
2812 		goto Enomem;
2813 
2814 	drbd_al_ext_cache = kmem_cache_create(
2815 		"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2816 	if (drbd_al_ext_cache == NULL)
2817 		goto Enomem;
2818 
2819 	/* mempools */
2820 	drbd_request_mempool = mempool_create(number,
2821 		mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2822 	if (drbd_request_mempool == NULL)
2823 		goto Enomem;
2824 
2825 	drbd_ee_mempool = mempool_create(number,
2826 		mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2827 	if (drbd_request_mempool == NULL)
2828 		goto Enomem;
2829 
2830 	/* drbd's page pool */
2831 	spin_lock_init(&drbd_pp_lock);
2832 
2833 	for (i = 0; i < number; i++) {
2834 		page = alloc_page(GFP_HIGHUSER);
2835 		if (!page)
2836 			goto Enomem;
2837 		set_page_private(page, (unsigned long)drbd_pp_pool);
2838 		drbd_pp_pool = page;
2839 	}
2840 	drbd_pp_vacant = number;
2841 
2842 	return 0;
2843 
2844 Enomem:
2845 	drbd_destroy_mempools(); /* in case we allocated some */
2846 	return -ENOMEM;
2847 }
2848 
2849 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2850 	void *unused)
2851 {
2852 	/* just so we have it.  you never know what interesting things we
2853 	 * might want to do here some day...
2854 	 */
2855 
2856 	return NOTIFY_DONE;
2857 }
2858 
2859 static struct notifier_block drbd_notifier = {
2860 	.notifier_call = drbd_notify_sys,
2861 };
2862 
2863 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2864 {
2865 	int rr;
2866 
2867 	rr = drbd_release_ee(mdev, &mdev->active_ee);
2868 	if (rr)
2869 		dev_err(DEV, "%d EEs in active list found!\n", rr);
2870 
2871 	rr = drbd_release_ee(mdev, &mdev->sync_ee);
2872 	if (rr)
2873 		dev_err(DEV, "%d EEs in sync list found!\n", rr);
2874 
2875 	rr = drbd_release_ee(mdev, &mdev->read_ee);
2876 	if (rr)
2877 		dev_err(DEV, "%d EEs in read list found!\n", rr);
2878 
2879 	rr = drbd_release_ee(mdev, &mdev->done_ee);
2880 	if (rr)
2881 		dev_err(DEV, "%d EEs in done list found!\n", rr);
2882 
2883 	rr = drbd_release_ee(mdev, &mdev->net_ee);
2884 	if (rr)
2885 		dev_err(DEV, "%d EEs in net list found!\n", rr);
2886 }
2887 
2888 /* caution. no locking.
2889  * currently only used from module cleanup code. */
2890 static void drbd_delete_device(unsigned int minor)
2891 {
2892 	struct drbd_conf *mdev = minor_to_mdev(minor);
2893 
2894 	if (!mdev)
2895 		return;
2896 
2897 	/* paranoia asserts */
2898 	if (mdev->open_cnt != 0)
2899 		dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
2900 				__FILE__ , __LINE__);
2901 
2902 	ERR_IF (!list_empty(&mdev->data.work.q)) {
2903 		struct list_head *lp;
2904 		list_for_each(lp, &mdev->data.work.q) {
2905 			dev_err(DEV, "lp = %p\n", lp);
2906 		}
2907 	};
2908 	/* end paranoia asserts */
2909 
2910 	del_gendisk(mdev->vdisk);
2911 
2912 	/* cleanup stuff that may have been allocated during
2913 	 * device (re-)configuration or state changes */
2914 
2915 	if (mdev->this_bdev)
2916 		bdput(mdev->this_bdev);
2917 
2918 	drbd_free_resources(mdev);
2919 
2920 	drbd_release_ee_lists(mdev);
2921 
2922 	/* should be free'd on disconnect? */
2923 	kfree(mdev->ee_hash);
2924 	/*
2925 	mdev->ee_hash_s = 0;
2926 	mdev->ee_hash = NULL;
2927 	*/
2928 
2929 	lc_destroy(mdev->act_log);
2930 	lc_destroy(mdev->resync);
2931 
2932 	kfree(mdev->p_uuid);
2933 	/* mdev->p_uuid = NULL; */
2934 
2935 	kfree(mdev->int_dig_out);
2936 	kfree(mdev->int_dig_in);
2937 	kfree(mdev->int_dig_vv);
2938 
2939 	/* cleanup the rest that has been
2940 	 * allocated from drbd_new_device
2941 	 * and actually free the mdev itself */
2942 	drbd_free_mdev(mdev);
2943 }
2944 
2945 static void drbd_cleanup(void)
2946 {
2947 	unsigned int i;
2948 
2949 	unregister_reboot_notifier(&drbd_notifier);
2950 
2951 	drbd_nl_cleanup();
2952 
2953 	if (minor_table) {
2954 		if (drbd_proc)
2955 			remove_proc_entry("drbd", NULL);
2956 		i = minor_count;
2957 		while (i--)
2958 			drbd_delete_device(i);
2959 		drbd_destroy_mempools();
2960 	}
2961 
2962 	kfree(minor_table);
2963 
2964 	unregister_blkdev(DRBD_MAJOR, "drbd");
2965 
2966 	printk(KERN_INFO "drbd: module cleanup done.\n");
2967 }
2968 
2969 /**
2970  * drbd_congested() - Callback for pdflush
2971  * @congested_data:	User data
2972  * @bdi_bits:		Bits pdflush is currently interested in
2973  *
2974  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2975  */
2976 static int drbd_congested(void *congested_data, int bdi_bits)
2977 {
2978 	struct drbd_conf *mdev = congested_data;
2979 	struct request_queue *q;
2980 	char reason = '-';
2981 	int r = 0;
2982 
2983 	if (!__inc_ap_bio_cond(mdev)) {
2984 		/* DRBD has frozen IO */
2985 		r = bdi_bits;
2986 		reason = 'd';
2987 		goto out;
2988 	}
2989 
2990 	if (get_ldev(mdev)) {
2991 		q = bdev_get_queue(mdev->ldev->backing_bdev);
2992 		r = bdi_congested(&q->backing_dev_info, bdi_bits);
2993 		put_ldev(mdev);
2994 		if (r)
2995 			reason = 'b';
2996 	}
2997 
2998 	if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
2999 		r |= (1 << BDI_async_congested);
3000 		reason = reason == 'b' ? 'a' : 'n';
3001 	}
3002 
3003 out:
3004 	mdev->congestion_reason = reason;
3005 	return r;
3006 }
3007 
3008 struct drbd_conf *drbd_new_device(unsigned int minor)
3009 {
3010 	struct drbd_conf *mdev;
3011 	struct gendisk *disk;
3012 	struct request_queue *q;
3013 
3014 	/* GFP_KERNEL, we are outside of all write-out paths */
3015 	mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3016 	if (!mdev)
3017 		return NULL;
3018 	if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3019 		goto out_no_cpumask;
3020 
3021 	mdev->minor = minor;
3022 
3023 	drbd_init_set_defaults(mdev);
3024 
3025 	q = blk_alloc_queue(GFP_KERNEL);
3026 	if (!q)
3027 		goto out_no_q;
3028 	mdev->rq_queue = q;
3029 	q->queuedata   = mdev;
3030 
3031 	disk = alloc_disk(1);
3032 	if (!disk)
3033 		goto out_no_disk;
3034 	mdev->vdisk = disk;
3035 
3036 	set_disk_ro(disk, TRUE);
3037 
3038 	disk->queue = q;
3039 	disk->major = DRBD_MAJOR;
3040 	disk->first_minor = minor;
3041 	disk->fops = &drbd_ops;
3042 	sprintf(disk->disk_name, "drbd%d", minor);
3043 	disk->private_data = mdev;
3044 
3045 	mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3046 	/* we have no partitions. we contain only ourselves. */
3047 	mdev->this_bdev->bd_contains = mdev->this_bdev;
3048 
3049 	q->backing_dev_info.congested_fn = drbd_congested;
3050 	q->backing_dev_info.congested_data = mdev;
3051 
3052 	blk_queue_make_request(q, drbd_make_request_26);
3053 	blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
3054 	blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3055 	blk_queue_merge_bvec(q, drbd_merge_bvec);
3056 	q->queue_lock = &mdev->req_lock; /* needed since we use */
3057 		/* plugging on a queue, that actually has no requests! */
3058 	q->unplug_fn = drbd_unplug_fn;
3059 
3060 	mdev->md_io_page = alloc_page(GFP_KERNEL);
3061 	if (!mdev->md_io_page)
3062 		goto out_no_io_page;
3063 
3064 	if (drbd_bm_init(mdev))
3065 		goto out_no_bitmap;
3066 	/* no need to lock access, we are still initializing this minor device. */
3067 	if (!tl_init(mdev))
3068 		goto out_no_tl;
3069 
3070 	mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3071 	if (!mdev->app_reads_hash)
3072 		goto out_no_app_reads;
3073 
3074 	mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3075 	if (!mdev->current_epoch)
3076 		goto out_no_epoch;
3077 
3078 	INIT_LIST_HEAD(&mdev->current_epoch->list);
3079 	mdev->epochs = 1;
3080 
3081 	return mdev;
3082 
3083 /* out_whatever_else:
3084 	kfree(mdev->current_epoch); */
3085 out_no_epoch:
3086 	kfree(mdev->app_reads_hash);
3087 out_no_app_reads:
3088 	tl_cleanup(mdev);
3089 out_no_tl:
3090 	drbd_bm_cleanup(mdev);
3091 out_no_bitmap:
3092 	__free_page(mdev->md_io_page);
3093 out_no_io_page:
3094 	put_disk(disk);
3095 out_no_disk:
3096 	blk_cleanup_queue(q);
3097 out_no_q:
3098 	free_cpumask_var(mdev->cpu_mask);
3099 out_no_cpumask:
3100 	kfree(mdev);
3101 	return NULL;
3102 }
3103 
3104 /* counterpart of drbd_new_device.
3105  * last part of drbd_delete_device. */
3106 void drbd_free_mdev(struct drbd_conf *mdev)
3107 {
3108 	kfree(mdev->current_epoch);
3109 	kfree(mdev->app_reads_hash);
3110 	tl_cleanup(mdev);
3111 	if (mdev->bitmap) /* should no longer be there. */
3112 		drbd_bm_cleanup(mdev);
3113 	__free_page(mdev->md_io_page);
3114 	put_disk(mdev->vdisk);
3115 	blk_cleanup_queue(mdev->rq_queue);
3116 	free_cpumask_var(mdev->cpu_mask);
3117 	kfree(mdev);
3118 }
3119 
3120 
3121 int __init drbd_init(void)
3122 {
3123 	int err;
3124 
3125 	if (sizeof(struct p_handshake) != 80) {
3126 		printk(KERN_ERR
3127 		       "drbd: never change the size or layout "
3128 		       "of the HandShake packet.\n");
3129 		return -EINVAL;
3130 	}
3131 
3132 	if (1 > minor_count || minor_count > 255) {
3133 		printk(KERN_ERR
3134 			"drbd: invalid minor_count (%d)\n", minor_count);
3135 #ifdef MODULE
3136 		return -EINVAL;
3137 #else
3138 		minor_count = 8;
3139 #endif
3140 	}
3141 
3142 	err = drbd_nl_init();
3143 	if (err)
3144 		return err;
3145 
3146 	err = register_blkdev(DRBD_MAJOR, "drbd");
3147 	if (err) {
3148 		printk(KERN_ERR
3149 		       "drbd: unable to register block device major %d\n",
3150 		       DRBD_MAJOR);
3151 		return err;
3152 	}
3153 
3154 	register_reboot_notifier(&drbd_notifier);
3155 
3156 	/*
3157 	 * allocate all necessary structs
3158 	 */
3159 	err = -ENOMEM;
3160 
3161 	init_waitqueue_head(&drbd_pp_wait);
3162 
3163 	drbd_proc = NULL; /* play safe for drbd_cleanup */
3164 	minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3165 				GFP_KERNEL);
3166 	if (!minor_table)
3167 		goto Enomem;
3168 
3169 	err = drbd_create_mempools();
3170 	if (err)
3171 		goto Enomem;
3172 
3173 	drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3174 	if (!drbd_proc)	{
3175 		printk(KERN_ERR "drbd: unable to register proc file\n");
3176 		goto Enomem;
3177 	}
3178 
3179 	rwlock_init(&global_state_lock);
3180 
3181 	printk(KERN_INFO "drbd: initialized. "
3182 	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3183 	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3184 	printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3185 	printk(KERN_INFO "drbd: registered as block device major %d\n",
3186 		DRBD_MAJOR);
3187 	printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3188 
3189 	return 0; /* Success! */
3190 
3191 Enomem:
3192 	drbd_cleanup();
3193 	if (err == -ENOMEM)
3194 		/* currently always the case */
3195 		printk(KERN_ERR "drbd: ran out of memory\n");
3196 	else
3197 		printk(KERN_ERR "drbd: initialization failure\n");
3198 	return err;
3199 }
3200 
3201 void drbd_free_bc(struct drbd_backing_dev *ldev)
3202 {
3203 	if (ldev == NULL)
3204 		return;
3205 
3206 	bd_release(ldev->backing_bdev);
3207 	bd_release(ldev->md_bdev);
3208 
3209 	fput(ldev->lo_file);
3210 	fput(ldev->md_file);
3211 
3212 	kfree(ldev);
3213 }
3214 
3215 void drbd_free_sock(struct drbd_conf *mdev)
3216 {
3217 	if (mdev->data.socket) {
3218 		mutex_lock(&mdev->data.mutex);
3219 		kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3220 		sock_release(mdev->data.socket);
3221 		mdev->data.socket = NULL;
3222 		mutex_unlock(&mdev->data.mutex);
3223 	}
3224 	if (mdev->meta.socket) {
3225 		mutex_lock(&mdev->meta.mutex);
3226 		kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3227 		sock_release(mdev->meta.socket);
3228 		mdev->meta.socket = NULL;
3229 		mutex_unlock(&mdev->meta.mutex);
3230 	}
3231 }
3232 
3233 
3234 void drbd_free_resources(struct drbd_conf *mdev)
3235 {
3236 	crypto_free_hash(mdev->csums_tfm);
3237 	mdev->csums_tfm = NULL;
3238 	crypto_free_hash(mdev->verify_tfm);
3239 	mdev->verify_tfm = NULL;
3240 	crypto_free_hash(mdev->cram_hmac_tfm);
3241 	mdev->cram_hmac_tfm = NULL;
3242 	crypto_free_hash(mdev->integrity_w_tfm);
3243 	mdev->integrity_w_tfm = NULL;
3244 	crypto_free_hash(mdev->integrity_r_tfm);
3245 	mdev->integrity_r_tfm = NULL;
3246 
3247 	drbd_free_sock(mdev);
3248 
3249 	__no_warn(local,
3250 		  drbd_free_bc(mdev->ldev);
3251 		  mdev->ldev = NULL;);
3252 }
3253 
3254 /* meta data management */
3255 
3256 struct meta_data_on_disk {
3257 	u64 la_size;           /* last agreed size. */
3258 	u64 uuid[UI_SIZE];   /* UUIDs. */
3259 	u64 device_uuid;
3260 	u64 reserved_u64_1;
3261 	u32 flags;             /* MDF */
3262 	u32 magic;
3263 	u32 md_size_sect;
3264 	u32 al_offset;         /* offset to this block */
3265 	u32 al_nr_extents;     /* important for restoring the AL */
3266 	      /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3267 	u32 bm_offset;         /* offset to the bitmap, from here */
3268 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3269 	u32 reserved_u32[4];
3270 
3271 } __packed;
3272 
3273 /**
3274  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3275  * @mdev:	DRBD device.
3276  */
3277 void drbd_md_sync(struct drbd_conf *mdev)
3278 {
3279 	struct meta_data_on_disk *buffer;
3280 	sector_t sector;
3281 	int i;
3282 
3283 	if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3284 		return;
3285 	del_timer(&mdev->md_sync_timer);
3286 
3287 	/* We use here D_FAILED and not D_ATTACHING because we try to write
3288 	 * metadata even if we detach due to a disk failure! */
3289 	if (!get_ldev_if_state(mdev, D_FAILED))
3290 		return;
3291 
3292 	mutex_lock(&mdev->md_io_mutex);
3293 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3294 	memset(buffer, 0, 512);
3295 
3296 	buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3297 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3298 		buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3299 	buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3300 	buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3301 
3302 	buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3303 	buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3304 	buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3305 	buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3306 	buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3307 
3308 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3309 
3310 	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3311 	sector = mdev->ldev->md.md_offset;
3312 
3313 	if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3314 		clear_bit(MD_DIRTY, &mdev->flags);
3315 	} else {
3316 		/* this was a try anyways ... */
3317 		dev_err(DEV, "meta data update failed!\n");
3318 
3319 		drbd_chk_io_error(mdev, 1, TRUE);
3320 	}
3321 
3322 	/* Update mdev->ldev->md.la_size_sect,
3323 	 * since we updated it on metadata. */
3324 	mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3325 
3326 	mutex_unlock(&mdev->md_io_mutex);
3327 	put_ldev(mdev);
3328 }
3329 
3330 /**
3331  * drbd_md_read() - Reads in the meta data super block
3332  * @mdev:	DRBD device.
3333  * @bdev:	Device from which the meta data should be read in.
3334  *
3335  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3336  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3337  */
3338 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3339 {
3340 	struct meta_data_on_disk *buffer;
3341 	int i, rv = NO_ERROR;
3342 
3343 	if (!get_ldev_if_state(mdev, D_ATTACHING))
3344 		return ERR_IO_MD_DISK;
3345 
3346 	mutex_lock(&mdev->md_io_mutex);
3347 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3348 
3349 	if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3350 		/* NOTE: cant do normal error processing here as this is
3351 		   called BEFORE disk is attached */
3352 		dev_err(DEV, "Error while reading metadata.\n");
3353 		rv = ERR_IO_MD_DISK;
3354 		goto err;
3355 	}
3356 
3357 	if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3358 		dev_err(DEV, "Error while reading metadata, magic not found.\n");
3359 		rv = ERR_MD_INVALID;
3360 		goto err;
3361 	}
3362 	if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3363 		dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3364 		    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3365 		rv = ERR_MD_INVALID;
3366 		goto err;
3367 	}
3368 	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3369 		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3370 		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3371 		rv = ERR_MD_INVALID;
3372 		goto err;
3373 	}
3374 	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3375 		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3376 		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3377 		rv = ERR_MD_INVALID;
3378 		goto err;
3379 	}
3380 
3381 	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3382 		dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3383 		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3384 		rv = ERR_MD_INVALID;
3385 		goto err;
3386 	}
3387 
3388 	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3389 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3390 		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3391 	bdev->md.flags = be32_to_cpu(buffer->flags);
3392 	mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3393 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3394 
3395 	if (mdev->sync_conf.al_extents < 7)
3396 		mdev->sync_conf.al_extents = 127;
3397 
3398  err:
3399 	mutex_unlock(&mdev->md_io_mutex);
3400 	put_ldev(mdev);
3401 
3402 	return rv;
3403 }
3404 
3405 /**
3406  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3407  * @mdev:	DRBD device.
3408  *
3409  * Call this function if you change anything that should be written to
3410  * the meta-data super block. This function sets MD_DIRTY, and starts a
3411  * timer that ensures that within five seconds you have to call drbd_md_sync().
3412  */
3413 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3414 {
3415 	set_bit(MD_DIRTY, &mdev->flags);
3416 	mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3417 }
3418 
3419 
3420 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3421 {
3422 	int i;
3423 
3424 	for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3425 		mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3426 }
3427 
3428 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3429 {
3430 	if (idx == UI_CURRENT) {
3431 		if (mdev->state.role == R_PRIMARY)
3432 			val |= 1;
3433 		else
3434 			val &= ~((u64)1);
3435 
3436 		drbd_set_ed_uuid(mdev, val);
3437 	}
3438 
3439 	mdev->ldev->md.uuid[idx] = val;
3440 	drbd_md_mark_dirty(mdev);
3441 }
3442 
3443 
3444 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3445 {
3446 	if (mdev->ldev->md.uuid[idx]) {
3447 		drbd_uuid_move_history(mdev);
3448 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3449 	}
3450 	_drbd_uuid_set(mdev, idx, val);
3451 }
3452 
3453 /**
3454  * drbd_uuid_new_current() - Creates a new current UUID
3455  * @mdev:	DRBD device.
3456  *
3457  * Creates a new current UUID, and rotates the old current UUID into
3458  * the bitmap slot. Causes an incremental resync upon next connect.
3459  */
3460 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3461 {
3462 	u64 val;
3463 
3464 	dev_info(DEV, "Creating new current UUID\n");
3465 	D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3466 	mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3467 
3468 	get_random_bytes(&val, sizeof(u64));
3469 	_drbd_uuid_set(mdev, UI_CURRENT, val);
3470 }
3471 
3472 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3473 {
3474 	if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3475 		return;
3476 
3477 	if (val == 0) {
3478 		drbd_uuid_move_history(mdev);
3479 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3480 		mdev->ldev->md.uuid[UI_BITMAP] = 0;
3481 	} else {
3482 		if (mdev->ldev->md.uuid[UI_BITMAP])
3483 			dev_warn(DEV, "bm UUID already set");
3484 
3485 		mdev->ldev->md.uuid[UI_BITMAP] = val;
3486 		mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3487 
3488 	}
3489 	drbd_md_mark_dirty(mdev);
3490 }
3491 
3492 /**
3493  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3494  * @mdev:	DRBD device.
3495  *
3496  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3497  */
3498 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3499 {
3500 	int rv = -EIO;
3501 
3502 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3503 		drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3504 		drbd_md_sync(mdev);
3505 		drbd_bm_set_all(mdev);
3506 
3507 		rv = drbd_bm_write(mdev);
3508 
3509 		if (!rv) {
3510 			drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3511 			drbd_md_sync(mdev);
3512 		}
3513 
3514 		put_ldev(mdev);
3515 	}
3516 
3517 	return rv;
3518 }
3519 
3520 /**
3521  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3522  * @mdev:	DRBD device.
3523  *
3524  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3525  */
3526 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3527 {
3528 	int rv = -EIO;
3529 
3530 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3531 		drbd_bm_clear_all(mdev);
3532 		rv = drbd_bm_write(mdev);
3533 		put_ldev(mdev);
3534 	}
3535 
3536 	return rv;
3537 }
3538 
3539 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3540 {
3541 	struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3542 	int rv;
3543 
3544 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3545 
3546 	drbd_bm_lock(mdev, work->why);
3547 	rv = work->io_fn(mdev);
3548 	drbd_bm_unlock(mdev);
3549 
3550 	clear_bit(BITMAP_IO, &mdev->flags);
3551 	wake_up(&mdev->misc_wait);
3552 
3553 	if (work->done)
3554 		work->done(mdev, rv);
3555 
3556 	clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3557 	work->why = NULL;
3558 
3559 	return 1;
3560 }
3561 
3562 /**
3563  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3564  * @mdev:	DRBD device.
3565  * @io_fn:	IO callback to be called when bitmap IO is possible
3566  * @done:	callback to be called after the bitmap IO was performed
3567  * @why:	Descriptive text of the reason for doing the IO
3568  *
3569  * While IO on the bitmap happens we freeze application IO thus we ensure
3570  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3571  * called from worker context. It MUST NOT be used while a previous such
3572  * work is still pending!
3573  */
3574 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3575 			  int (*io_fn)(struct drbd_conf *),
3576 			  void (*done)(struct drbd_conf *, int),
3577 			  char *why)
3578 {
3579 	D_ASSERT(current == mdev->worker.task);
3580 
3581 	D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3582 	D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3583 	D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3584 	if (mdev->bm_io_work.why)
3585 		dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3586 			why, mdev->bm_io_work.why);
3587 
3588 	mdev->bm_io_work.io_fn = io_fn;
3589 	mdev->bm_io_work.done = done;
3590 	mdev->bm_io_work.why = why;
3591 
3592 	set_bit(BITMAP_IO, &mdev->flags);
3593 	if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3594 		if (list_empty(&mdev->bm_io_work.w.list)) {
3595 			set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3596 			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3597 		} else
3598 			dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3599 	}
3600 }
3601 
3602 /**
3603  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3604  * @mdev:	DRBD device.
3605  * @io_fn:	IO callback to be called when bitmap IO is possible
3606  * @why:	Descriptive text of the reason for doing the IO
3607  *
3608  * freezes application IO while that the actual IO operations runs. This
3609  * functions MAY NOT be called from worker context.
3610  */
3611 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3612 {
3613 	int rv;
3614 
3615 	D_ASSERT(current != mdev->worker.task);
3616 
3617 	drbd_suspend_io(mdev);
3618 
3619 	drbd_bm_lock(mdev, why);
3620 	rv = io_fn(mdev);
3621 	drbd_bm_unlock(mdev);
3622 
3623 	drbd_resume_io(mdev);
3624 
3625 	return rv;
3626 }
3627 
3628 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3629 {
3630 	if ((mdev->ldev->md.flags & flag) != flag) {
3631 		drbd_md_mark_dirty(mdev);
3632 		mdev->ldev->md.flags |= flag;
3633 	}
3634 }
3635 
3636 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3637 {
3638 	if ((mdev->ldev->md.flags & flag) != 0) {
3639 		drbd_md_mark_dirty(mdev);
3640 		mdev->ldev->md.flags &= ~flag;
3641 	}
3642 }
3643 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3644 {
3645 	return (bdev->md.flags & flag) != 0;
3646 }
3647 
3648 static void md_sync_timer_fn(unsigned long data)
3649 {
3650 	struct drbd_conf *mdev = (struct drbd_conf *) data;
3651 
3652 	drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3653 }
3654 
3655 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3656 {
3657 	dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3658 	drbd_md_sync(mdev);
3659 
3660 	return 1;
3661 }
3662 
3663 #ifdef CONFIG_DRBD_FAULT_INJECTION
3664 /* Fault insertion support including random number generator shamelessly
3665  * stolen from kernel/rcutorture.c */
3666 struct fault_random_state {
3667 	unsigned long state;
3668 	unsigned long count;
3669 };
3670 
3671 #define FAULT_RANDOM_MULT 39916801  /* prime */
3672 #define FAULT_RANDOM_ADD	479001701 /* prime */
3673 #define FAULT_RANDOM_REFRESH 10000
3674 
3675 /*
3676  * Crude but fast random-number generator.  Uses a linear congruential
3677  * generator, with occasional help from get_random_bytes().
3678  */
3679 static unsigned long
3680 _drbd_fault_random(struct fault_random_state *rsp)
3681 {
3682 	long refresh;
3683 
3684 	if (!rsp->count--) {
3685 		get_random_bytes(&refresh, sizeof(refresh));
3686 		rsp->state += refresh;
3687 		rsp->count = FAULT_RANDOM_REFRESH;
3688 	}
3689 	rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3690 	return swahw32(rsp->state);
3691 }
3692 
3693 static char *
3694 _drbd_fault_str(unsigned int type) {
3695 	static char *_faults[] = {
3696 		[DRBD_FAULT_MD_WR] = "Meta-data write",
3697 		[DRBD_FAULT_MD_RD] = "Meta-data read",
3698 		[DRBD_FAULT_RS_WR] = "Resync write",
3699 		[DRBD_FAULT_RS_RD] = "Resync read",
3700 		[DRBD_FAULT_DT_WR] = "Data write",
3701 		[DRBD_FAULT_DT_RD] = "Data read",
3702 		[DRBD_FAULT_DT_RA] = "Data read ahead",
3703 		[DRBD_FAULT_BM_ALLOC] = "BM allocation",
3704 		[DRBD_FAULT_AL_EE] = "EE allocation",
3705 		[DRBD_FAULT_RECEIVE] = "receive data corruption",
3706 	};
3707 
3708 	return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3709 }
3710 
3711 unsigned int
3712 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3713 {
3714 	static struct fault_random_state rrs = {0, 0};
3715 
3716 	unsigned int ret = (
3717 		(fault_devs == 0 ||
3718 			((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3719 		(((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3720 
3721 	if (ret) {
3722 		fault_count++;
3723 
3724 		if (__ratelimit(&drbd_ratelimit_state))
3725 			dev_warn(DEV, "***Simulating %s failure\n",
3726 				_drbd_fault_str(type));
3727 	}
3728 
3729 	return ret;
3730 }
3731 #endif
3732 
3733 const char *drbd_buildtag(void)
3734 {
3735 	/* DRBD built from external sources has here a reference to the
3736 	   git hash of the source code. */
3737 
3738 	static char buildtag[38] = "\0uilt-in";
3739 
3740 	if (buildtag[0] == 0) {
3741 #ifdef CONFIG_MODULES
3742 		if (THIS_MODULE != NULL)
3743 			sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3744 		else
3745 #endif
3746 			buildtag[0] = 'b';
3747 	}
3748 
3749 	return buildtag;
3750 }
3751 
3752 module_init(drbd_init)
3753 module_exit(drbd_cleanup)
3754 
3755 EXPORT_SYMBOL(drbd_conn_str);
3756 EXPORT_SYMBOL(drbd_role_str);
3757 EXPORT_SYMBOL(drbd_disk_str);
3758 EXPORT_SYMBOL(drbd_set_st_err_str);
3759