xref: /linux/net/ceph/osdmap.c (revision e5a52fd2b8cdb700b3c07b030e050a49ef3156b9)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 #include <linux/ceph/ceph_debug.h>
4 
5 #include <linux/module.h>
6 #include <linux/slab.h>
7 
8 #include <linux/ceph/libceph.h>
9 #include <linux/ceph/osdmap.h>
10 #include <linux/ceph/decode.h>
11 #include <linux/crush/hash.h>
12 #include <linux/crush/mapper.h>
13 
14 char *ceph_osdmap_state_str(char *str, int len, u32 state)
15 {
16 	if (!len)
17 		return str;
18 
19 	if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
20 		snprintf(str, len, "exists, up");
21 	else if (state & CEPH_OSD_EXISTS)
22 		snprintf(str, len, "exists");
23 	else if (state & CEPH_OSD_UP)
24 		snprintf(str, len, "up");
25 	else
26 		snprintf(str, len, "doesn't exist");
27 
28 	return str;
29 }
30 
31 /* maps */
32 
33 static int calc_bits_of(unsigned int t)
34 {
35 	int b = 0;
36 	while (t) {
37 		t = t >> 1;
38 		b++;
39 	}
40 	return b;
41 }
42 
43 /*
44  * the foo_mask is the smallest value 2^n-1 that is >= foo.
45  */
46 static void calc_pg_masks(struct ceph_pg_pool_info *pi)
47 {
48 	pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1;
49 	pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1;
50 }
51 
52 /*
53  * decode crush map
54  */
55 static int crush_decode_uniform_bucket(void **p, void *end,
56 				       struct crush_bucket_uniform *b)
57 {
58 	dout("crush_decode_uniform_bucket %p to %p\n", *p, end);
59 	ceph_decode_need(p, end, (1+b->h.size) * sizeof(u32), bad);
60 	b->item_weight = ceph_decode_32(p);
61 	return 0;
62 bad:
63 	return -EINVAL;
64 }
65 
66 static int crush_decode_list_bucket(void **p, void *end,
67 				    struct crush_bucket_list *b)
68 {
69 	int j;
70 	dout("crush_decode_list_bucket %p to %p\n", *p, end);
71 	b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
72 	if (b->item_weights == NULL)
73 		return -ENOMEM;
74 	b->sum_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
75 	if (b->sum_weights == NULL)
76 		return -ENOMEM;
77 	ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
78 	for (j = 0; j < b->h.size; j++) {
79 		b->item_weights[j] = ceph_decode_32(p);
80 		b->sum_weights[j] = ceph_decode_32(p);
81 	}
82 	return 0;
83 bad:
84 	return -EINVAL;
85 }
86 
87 static int crush_decode_tree_bucket(void **p, void *end,
88 				    struct crush_bucket_tree *b)
89 {
90 	int j;
91 	dout("crush_decode_tree_bucket %p to %p\n", *p, end);
92 	ceph_decode_8_safe(p, end, b->num_nodes, bad);
93 	b->node_weights = kcalloc(b->num_nodes, sizeof(u32), GFP_NOFS);
94 	if (b->node_weights == NULL)
95 		return -ENOMEM;
96 	ceph_decode_need(p, end, b->num_nodes * sizeof(u32), bad);
97 	for (j = 0; j < b->num_nodes; j++)
98 		b->node_weights[j] = ceph_decode_32(p);
99 	return 0;
100 bad:
101 	return -EINVAL;
102 }
103 
104 static int crush_decode_straw_bucket(void **p, void *end,
105 				     struct crush_bucket_straw *b)
106 {
107 	int j;
108 	dout("crush_decode_straw_bucket %p to %p\n", *p, end);
109 	b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
110 	if (b->item_weights == NULL)
111 		return -ENOMEM;
112 	b->straws = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
113 	if (b->straws == NULL)
114 		return -ENOMEM;
115 	ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
116 	for (j = 0; j < b->h.size; j++) {
117 		b->item_weights[j] = ceph_decode_32(p);
118 		b->straws[j] = ceph_decode_32(p);
119 	}
120 	return 0;
121 bad:
122 	return -EINVAL;
123 }
124 
125 static int crush_decode_straw2_bucket(void **p, void *end,
126 				      struct crush_bucket_straw2 *b)
127 {
128 	int j;
129 	dout("crush_decode_straw2_bucket %p to %p\n", *p, end);
130 	b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
131 	if (b->item_weights == NULL)
132 		return -ENOMEM;
133 	ceph_decode_need(p, end, b->h.size * sizeof(u32), bad);
134 	for (j = 0; j < b->h.size; j++)
135 		b->item_weights[j] = ceph_decode_32(p);
136 	return 0;
137 bad:
138 	return -EINVAL;
139 }
140 
141 struct crush_name_node {
142 	struct rb_node cn_node;
143 	int cn_id;
144 	char cn_name[];
145 };
146 
147 static struct crush_name_node *alloc_crush_name(size_t name_len)
148 {
149 	struct crush_name_node *cn;
150 
151 	cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO);
152 	if (!cn)
153 		return NULL;
154 
155 	RB_CLEAR_NODE(&cn->cn_node);
156 	return cn;
157 }
158 
159 static void free_crush_name(struct crush_name_node *cn)
160 {
161 	WARN_ON(!RB_EMPTY_NODE(&cn->cn_node));
162 
163 	kfree(cn);
164 }
165 
166 DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node)
167 
168 static int decode_crush_names(void **p, void *end, struct rb_root *root)
169 {
170 	u32 n;
171 
172 	ceph_decode_32_safe(p, end, n, e_inval);
173 	while (n--) {
174 		struct crush_name_node *cn;
175 		int id;
176 		u32 name_len;
177 
178 		ceph_decode_32_safe(p, end, id, e_inval);
179 		ceph_decode_32_safe(p, end, name_len, e_inval);
180 		ceph_decode_need(p, end, name_len, e_inval);
181 
182 		cn = alloc_crush_name(name_len);
183 		if (!cn)
184 			return -ENOMEM;
185 
186 		cn->cn_id = id;
187 		memcpy(cn->cn_name, *p, name_len);
188 		cn->cn_name[name_len] = '\0';
189 		*p += name_len;
190 
191 		if (!__insert_crush_name(root, cn)) {
192 			free_crush_name(cn);
193 			return -EEXIST;
194 		}
195 	}
196 
197 	return 0;
198 
199 e_inval:
200 	return -EINVAL;
201 }
202 
203 void clear_crush_names(struct rb_root *root)
204 {
205 	while (!RB_EMPTY_ROOT(root)) {
206 		struct crush_name_node *cn =
207 		    rb_entry(rb_first(root), struct crush_name_node, cn_node);
208 
209 		erase_crush_name(root, cn);
210 		free_crush_name(cn);
211 	}
212 }
213 
214 static struct crush_choose_arg_map *alloc_choose_arg_map(void)
215 {
216 	struct crush_choose_arg_map *arg_map;
217 
218 	arg_map = kzalloc(sizeof(*arg_map), GFP_NOIO);
219 	if (!arg_map)
220 		return NULL;
221 
222 	RB_CLEAR_NODE(&arg_map->node);
223 	return arg_map;
224 }
225 
226 static void free_choose_arg_map(struct crush_choose_arg_map *arg_map)
227 {
228 	if (arg_map) {
229 		int i, j;
230 
231 		WARN_ON(!RB_EMPTY_NODE(&arg_map->node));
232 
233 		for (i = 0; i < arg_map->size; i++) {
234 			struct crush_choose_arg *arg = &arg_map->args[i];
235 
236 			for (j = 0; j < arg->weight_set_size; j++)
237 				kfree(arg->weight_set[j].weights);
238 			kfree(arg->weight_set);
239 			kfree(arg->ids);
240 		}
241 		kfree(arg_map->args);
242 		kfree(arg_map);
243 	}
244 }
245 
246 DEFINE_RB_FUNCS(choose_arg_map, struct crush_choose_arg_map, choose_args_index,
247 		node);
248 
249 void clear_choose_args(struct crush_map *c)
250 {
251 	while (!RB_EMPTY_ROOT(&c->choose_args)) {
252 		struct crush_choose_arg_map *arg_map =
253 		    rb_entry(rb_first(&c->choose_args),
254 			     struct crush_choose_arg_map, node);
255 
256 		erase_choose_arg_map(&c->choose_args, arg_map);
257 		free_choose_arg_map(arg_map);
258 	}
259 }
260 
261 static u32 *decode_array_32_alloc(void **p, void *end, u32 *plen)
262 {
263 	u32 *a = NULL;
264 	u32 len;
265 	int ret;
266 
267 	ceph_decode_32_safe(p, end, len, e_inval);
268 	if (len) {
269 		u32 i;
270 
271 		a = kmalloc_array(len, sizeof(u32), GFP_NOIO);
272 		if (!a) {
273 			ret = -ENOMEM;
274 			goto fail;
275 		}
276 
277 		ceph_decode_need(p, end, len * sizeof(u32), e_inval);
278 		for (i = 0; i < len; i++)
279 			a[i] = ceph_decode_32(p);
280 	}
281 
282 	*plen = len;
283 	return a;
284 
285 e_inval:
286 	ret = -EINVAL;
287 fail:
288 	kfree(a);
289 	return ERR_PTR(ret);
290 }
291 
292 /*
293  * Assumes @arg is zero-initialized.
294  */
295 static int decode_choose_arg(void **p, void *end, struct crush_choose_arg *arg)
296 {
297 	int ret;
298 
299 	ceph_decode_32_safe(p, end, arg->weight_set_size, e_inval);
300 	if (arg->weight_set_size) {
301 		u32 i;
302 
303 		arg->weight_set = kmalloc_array(arg->weight_set_size,
304 						sizeof(*arg->weight_set),
305 						GFP_NOIO);
306 		if (!arg->weight_set)
307 			return -ENOMEM;
308 
309 		for (i = 0; i < arg->weight_set_size; i++) {
310 			struct crush_weight_set *w = &arg->weight_set[i];
311 
312 			w->weights = decode_array_32_alloc(p, end, &w->size);
313 			if (IS_ERR(w->weights)) {
314 				ret = PTR_ERR(w->weights);
315 				w->weights = NULL;
316 				return ret;
317 			}
318 		}
319 	}
320 
321 	arg->ids = decode_array_32_alloc(p, end, &arg->ids_size);
322 	if (IS_ERR(arg->ids)) {
323 		ret = PTR_ERR(arg->ids);
324 		arg->ids = NULL;
325 		return ret;
326 	}
327 
328 	return 0;
329 
330 e_inval:
331 	return -EINVAL;
332 }
333 
334 static int decode_choose_args(void **p, void *end, struct crush_map *c)
335 {
336 	struct crush_choose_arg_map *arg_map = NULL;
337 	u32 num_choose_arg_maps, num_buckets;
338 	int ret;
339 
340 	ceph_decode_32_safe(p, end, num_choose_arg_maps, e_inval);
341 	while (num_choose_arg_maps--) {
342 		arg_map = alloc_choose_arg_map();
343 		if (!arg_map) {
344 			ret = -ENOMEM;
345 			goto fail;
346 		}
347 
348 		ceph_decode_64_safe(p, end, arg_map->choose_args_index,
349 				    e_inval);
350 		arg_map->size = c->max_buckets;
351 		arg_map->args = kcalloc(arg_map->size, sizeof(*arg_map->args),
352 					GFP_NOIO);
353 		if (!arg_map->args) {
354 			ret = -ENOMEM;
355 			goto fail;
356 		}
357 
358 		ceph_decode_32_safe(p, end, num_buckets, e_inval);
359 		while (num_buckets--) {
360 			struct crush_choose_arg *arg;
361 			u32 bucket_index;
362 
363 			ceph_decode_32_safe(p, end, bucket_index, e_inval);
364 			if (bucket_index >= arg_map->size)
365 				goto e_inval;
366 
367 			arg = &arg_map->args[bucket_index];
368 			ret = decode_choose_arg(p, end, arg);
369 			if (ret)
370 				goto fail;
371 
372 			if (arg->ids_size &&
373 			    arg->ids_size != c->buckets[bucket_index]->size)
374 				goto e_inval;
375 		}
376 
377 		insert_choose_arg_map(&c->choose_args, arg_map);
378 	}
379 
380 	return 0;
381 
382 e_inval:
383 	ret = -EINVAL;
384 fail:
385 	free_choose_arg_map(arg_map);
386 	return ret;
387 }
388 
389 static void crush_finalize(struct crush_map *c)
390 {
391 	__s32 b;
392 
393 	/* Space for the array of pointers to per-bucket workspace */
394 	c->working_size = sizeof(struct crush_work) +
395 	    c->max_buckets * sizeof(struct crush_work_bucket *);
396 
397 	for (b = 0; b < c->max_buckets; b++) {
398 		if (!c->buckets[b])
399 			continue;
400 
401 		switch (c->buckets[b]->alg) {
402 		default:
403 			/*
404 			 * The base case, permutation variables and
405 			 * the pointer to the permutation array.
406 			 */
407 			c->working_size += sizeof(struct crush_work_bucket);
408 			break;
409 		}
410 		/* Every bucket has a permutation array. */
411 		c->working_size += c->buckets[b]->size * sizeof(__u32);
412 	}
413 }
414 
415 static struct crush_map *crush_decode(void *pbyval, void *end)
416 {
417 	struct crush_map *c;
418 	int err;
419 	int i, j;
420 	void **p = &pbyval;
421 	void *start = pbyval;
422 	u32 magic;
423 
424 	dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
425 
426 	c = kzalloc(sizeof(*c), GFP_NOFS);
427 	if (c == NULL)
428 		return ERR_PTR(-ENOMEM);
429 
430 	c->type_names = RB_ROOT;
431 	c->names = RB_ROOT;
432 	c->choose_args = RB_ROOT;
433 
434         /* set tunables to default values */
435         c->choose_local_tries = 2;
436         c->choose_local_fallback_tries = 5;
437         c->choose_total_tries = 19;
438 	c->chooseleaf_descend_once = 0;
439 
440 	ceph_decode_need(p, end, 4*sizeof(u32), bad);
441 	magic = ceph_decode_32(p);
442 	if (magic != CRUSH_MAGIC) {
443 		pr_err("crush_decode magic %x != current %x\n",
444 		       (unsigned int)magic, (unsigned int)CRUSH_MAGIC);
445 		goto bad;
446 	}
447 	c->max_buckets = ceph_decode_32(p);
448 	c->max_rules = ceph_decode_32(p);
449 	c->max_devices = ceph_decode_32(p);
450 
451 	c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS);
452 	if (c->buckets == NULL)
453 		goto badmem;
454 	c->rules = kcalloc(c->max_rules, sizeof(*c->rules), GFP_NOFS);
455 	if (c->rules == NULL)
456 		goto badmem;
457 
458 	/* buckets */
459 	for (i = 0; i < c->max_buckets; i++) {
460 		int size = 0;
461 		u32 alg;
462 		struct crush_bucket *b;
463 
464 		ceph_decode_32_safe(p, end, alg, bad);
465 		if (alg == 0) {
466 			c->buckets[i] = NULL;
467 			continue;
468 		}
469 		dout("crush_decode bucket %d off %x %p to %p\n",
470 		     i, (int)(*p-start), *p, end);
471 
472 		switch (alg) {
473 		case CRUSH_BUCKET_UNIFORM:
474 			size = sizeof(struct crush_bucket_uniform);
475 			break;
476 		case CRUSH_BUCKET_LIST:
477 			size = sizeof(struct crush_bucket_list);
478 			break;
479 		case CRUSH_BUCKET_TREE:
480 			size = sizeof(struct crush_bucket_tree);
481 			break;
482 		case CRUSH_BUCKET_STRAW:
483 			size = sizeof(struct crush_bucket_straw);
484 			break;
485 		case CRUSH_BUCKET_STRAW2:
486 			size = sizeof(struct crush_bucket_straw2);
487 			break;
488 		default:
489 			goto bad;
490 		}
491 		BUG_ON(size == 0);
492 		b = c->buckets[i] = kzalloc(size, GFP_NOFS);
493 		if (b == NULL)
494 			goto badmem;
495 
496 		ceph_decode_need(p, end, 4*sizeof(u32), bad);
497 		b->id = ceph_decode_32(p);
498 		b->type = ceph_decode_16(p);
499 		b->alg = ceph_decode_8(p);
500 		b->hash = ceph_decode_8(p);
501 		b->weight = ceph_decode_32(p);
502 		b->size = ceph_decode_32(p);
503 
504 		dout("crush_decode bucket size %d off %x %p to %p\n",
505 		     b->size, (int)(*p-start), *p, end);
506 
507 		b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS);
508 		if (b->items == NULL)
509 			goto badmem;
510 
511 		ceph_decode_need(p, end, b->size*sizeof(u32), bad);
512 		for (j = 0; j < b->size; j++)
513 			b->items[j] = ceph_decode_32(p);
514 
515 		switch (b->alg) {
516 		case CRUSH_BUCKET_UNIFORM:
517 			err = crush_decode_uniform_bucket(p, end,
518 				  (struct crush_bucket_uniform *)b);
519 			if (err < 0)
520 				goto fail;
521 			break;
522 		case CRUSH_BUCKET_LIST:
523 			err = crush_decode_list_bucket(p, end,
524 			       (struct crush_bucket_list *)b);
525 			if (err < 0)
526 				goto fail;
527 			break;
528 		case CRUSH_BUCKET_TREE:
529 			err = crush_decode_tree_bucket(p, end,
530 				(struct crush_bucket_tree *)b);
531 			if (err < 0)
532 				goto fail;
533 			break;
534 		case CRUSH_BUCKET_STRAW:
535 			err = crush_decode_straw_bucket(p, end,
536 				(struct crush_bucket_straw *)b);
537 			if (err < 0)
538 				goto fail;
539 			break;
540 		case CRUSH_BUCKET_STRAW2:
541 			err = crush_decode_straw2_bucket(p, end,
542 				(struct crush_bucket_straw2 *)b);
543 			if (err < 0)
544 				goto fail;
545 			break;
546 		}
547 	}
548 
549 	/* rules */
550 	dout("rule vec is %p\n", c->rules);
551 	for (i = 0; i < c->max_rules; i++) {
552 		u32 yes;
553 		struct crush_rule *r;
554 
555 		ceph_decode_32_safe(p, end, yes, bad);
556 		if (!yes) {
557 			dout("crush_decode NO rule %d off %x %p to %p\n",
558 			     i, (int)(*p-start), *p, end);
559 			c->rules[i] = NULL;
560 			continue;
561 		}
562 
563 		dout("crush_decode rule %d off %x %p to %p\n",
564 		     i, (int)(*p-start), *p, end);
565 
566 		/* len */
567 		ceph_decode_32_safe(p, end, yes, bad);
568 #if BITS_PER_LONG == 32
569 		if (yes > (ULONG_MAX - sizeof(*r))
570 			  / sizeof(struct crush_rule_step))
571 			goto bad;
572 #endif
573 		r = kmalloc(struct_size(r, steps, yes), GFP_NOFS);
574 		c->rules[i] = r;
575 		if (r == NULL)
576 			goto badmem;
577 		dout(" rule %d is at %p\n", i, r);
578 		r->len = yes;
579 		ceph_decode_copy_safe(p, end, &r->mask, 4, bad); /* 4 u8's */
580 		ceph_decode_need(p, end, r->len*3*sizeof(u32), bad);
581 		for (j = 0; j < r->len; j++) {
582 			r->steps[j].op = ceph_decode_32(p);
583 			r->steps[j].arg1 = ceph_decode_32(p);
584 			r->steps[j].arg2 = ceph_decode_32(p);
585 		}
586 	}
587 
588 	err = decode_crush_names(p, end, &c->type_names);
589 	if (err)
590 		goto fail;
591 
592 	err = decode_crush_names(p, end, &c->names);
593 	if (err)
594 		goto fail;
595 
596 	ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */
597 
598         /* tunables */
599         ceph_decode_need(p, end, 3*sizeof(u32), done);
600         c->choose_local_tries = ceph_decode_32(p);
601         c->choose_local_fallback_tries =  ceph_decode_32(p);
602         c->choose_total_tries = ceph_decode_32(p);
603         dout("crush decode tunable choose_local_tries = %d\n",
604              c->choose_local_tries);
605         dout("crush decode tunable choose_local_fallback_tries = %d\n",
606              c->choose_local_fallback_tries);
607         dout("crush decode tunable choose_total_tries = %d\n",
608              c->choose_total_tries);
609 
610 	ceph_decode_need(p, end, sizeof(u32), done);
611 	c->chooseleaf_descend_once = ceph_decode_32(p);
612 	dout("crush decode tunable chooseleaf_descend_once = %d\n",
613 	     c->chooseleaf_descend_once);
614 
615 	ceph_decode_need(p, end, sizeof(u8), done);
616 	c->chooseleaf_vary_r = ceph_decode_8(p);
617 	dout("crush decode tunable chooseleaf_vary_r = %d\n",
618 	     c->chooseleaf_vary_r);
619 
620 	/* skip straw_calc_version, allowed_bucket_algs */
621 	ceph_decode_need(p, end, sizeof(u8) + sizeof(u32), done);
622 	*p += sizeof(u8) + sizeof(u32);
623 
624 	ceph_decode_need(p, end, sizeof(u8), done);
625 	c->chooseleaf_stable = ceph_decode_8(p);
626 	dout("crush decode tunable chooseleaf_stable = %d\n",
627 	     c->chooseleaf_stable);
628 
629 	if (*p != end) {
630 		/* class_map */
631 		ceph_decode_skip_map(p, end, 32, 32, bad);
632 		/* class_name */
633 		ceph_decode_skip_map(p, end, 32, string, bad);
634 		/* class_bucket */
635 		ceph_decode_skip_map_of_map(p, end, 32, 32, 32, bad);
636 	}
637 
638 	if (*p != end) {
639 		err = decode_choose_args(p, end, c);
640 		if (err)
641 			goto fail;
642 	}
643 
644 done:
645 	crush_finalize(c);
646 	dout("crush_decode success\n");
647 	return c;
648 
649 badmem:
650 	err = -ENOMEM;
651 fail:
652 	dout("crush_decode fail %d\n", err);
653 	crush_destroy(c);
654 	return ERR_PTR(err);
655 
656 bad:
657 	err = -EINVAL;
658 	goto fail;
659 }
660 
661 int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs)
662 {
663 	if (lhs->pool < rhs->pool)
664 		return -1;
665 	if (lhs->pool > rhs->pool)
666 		return 1;
667 	if (lhs->seed < rhs->seed)
668 		return -1;
669 	if (lhs->seed > rhs->seed)
670 		return 1;
671 
672 	return 0;
673 }
674 
675 int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs)
676 {
677 	int ret;
678 
679 	ret = ceph_pg_compare(&lhs->pgid, &rhs->pgid);
680 	if (ret)
681 		return ret;
682 
683 	if (lhs->shard < rhs->shard)
684 		return -1;
685 	if (lhs->shard > rhs->shard)
686 		return 1;
687 
688 	return 0;
689 }
690 
691 static struct ceph_pg_mapping *alloc_pg_mapping(size_t payload_len)
692 {
693 	struct ceph_pg_mapping *pg;
694 
695 	pg = kmalloc(sizeof(*pg) + payload_len, GFP_NOIO);
696 	if (!pg)
697 		return NULL;
698 
699 	RB_CLEAR_NODE(&pg->node);
700 	return pg;
701 }
702 
703 static void free_pg_mapping(struct ceph_pg_mapping *pg)
704 {
705 	WARN_ON(!RB_EMPTY_NODE(&pg->node));
706 
707 	kfree(pg);
708 }
709 
710 /*
711  * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
712  * to a set of osds) and primary_temp (explicit primary setting)
713  */
714 DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare,
715 		 RB_BYPTR, const struct ceph_pg *, node)
716 
717 /*
718  * rbtree of pg pool info
719  */
720 DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node)
721 
722 struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
723 {
724 	return lookup_pg_pool(&map->pg_pools, id);
725 }
726 
727 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
728 {
729 	struct ceph_pg_pool_info *pi;
730 
731 	if (id == CEPH_NOPOOL)
732 		return NULL;
733 
734 	if (WARN_ON_ONCE(id > (u64) INT_MAX))
735 		return NULL;
736 
737 	pi = lookup_pg_pool(&map->pg_pools, id);
738 	return pi ? pi->name : NULL;
739 }
740 EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
741 
742 int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
743 {
744 	struct rb_node *rbp;
745 
746 	for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) {
747 		struct ceph_pg_pool_info *pi =
748 			rb_entry(rbp, struct ceph_pg_pool_info, node);
749 		if (pi->name && strcmp(pi->name, name) == 0)
750 			return pi->id;
751 	}
752 	return -ENOENT;
753 }
754 EXPORT_SYMBOL(ceph_pg_poolid_by_name);
755 
756 u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id)
757 {
758 	struct ceph_pg_pool_info *pi;
759 
760 	pi = lookup_pg_pool(&map->pg_pools, id);
761 	return pi ? pi->flags : 0;
762 }
763 EXPORT_SYMBOL(ceph_pg_pool_flags);
764 
765 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
766 {
767 	erase_pg_pool(root, pi);
768 	kfree(pi->name);
769 	kfree(pi);
770 }
771 
772 static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
773 {
774 	u8 ev, cv;
775 	unsigned len, num;
776 	void *pool_end;
777 
778 	ceph_decode_need(p, end, 2 + 4, bad);
779 	ev = ceph_decode_8(p);  /* encoding version */
780 	cv = ceph_decode_8(p); /* compat version */
781 	if (ev < 5) {
782 		pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
783 		return -EINVAL;
784 	}
785 	if (cv > 9) {
786 		pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
787 		return -EINVAL;
788 	}
789 	len = ceph_decode_32(p);
790 	ceph_decode_need(p, end, len, bad);
791 	pool_end = *p + len;
792 
793 	pi->type = ceph_decode_8(p);
794 	pi->size = ceph_decode_8(p);
795 	pi->crush_ruleset = ceph_decode_8(p);
796 	pi->object_hash = ceph_decode_8(p);
797 
798 	pi->pg_num = ceph_decode_32(p);
799 	pi->pgp_num = ceph_decode_32(p);
800 
801 	*p += 4 + 4;  /* skip lpg* */
802 	*p += 4;      /* skip last_change */
803 	*p += 8 + 4;  /* skip snap_seq, snap_epoch */
804 
805 	/* skip snaps */
806 	num = ceph_decode_32(p);
807 	while (num--) {
808 		*p += 8;  /* snapid key */
809 		*p += 1 + 1; /* versions */
810 		len = ceph_decode_32(p);
811 		*p += len;
812 	}
813 
814 	/* skip removed_snaps */
815 	num = ceph_decode_32(p);
816 	*p += num * (8 + 8);
817 
818 	*p += 8;  /* skip auid */
819 	pi->flags = ceph_decode_64(p);
820 	*p += 4;  /* skip crash_replay_interval */
821 
822 	if (ev >= 7)
823 		pi->min_size = ceph_decode_8(p);
824 	else
825 		pi->min_size = pi->size - pi->size / 2;
826 
827 	if (ev >= 8)
828 		*p += 8 + 8;  /* skip quota_max_* */
829 
830 	if (ev >= 9) {
831 		/* skip tiers */
832 		num = ceph_decode_32(p);
833 		*p += num * 8;
834 
835 		*p += 8;  /* skip tier_of */
836 		*p += 1;  /* skip cache_mode */
837 
838 		pi->read_tier = ceph_decode_64(p);
839 		pi->write_tier = ceph_decode_64(p);
840 	} else {
841 		pi->read_tier = -1;
842 		pi->write_tier = -1;
843 	}
844 
845 	if (ev >= 10) {
846 		/* skip properties */
847 		num = ceph_decode_32(p);
848 		while (num--) {
849 			len = ceph_decode_32(p);
850 			*p += len; /* key */
851 			len = ceph_decode_32(p);
852 			*p += len; /* val */
853 		}
854 	}
855 
856 	if (ev >= 11) {
857 		/* skip hit_set_params */
858 		*p += 1 + 1; /* versions */
859 		len = ceph_decode_32(p);
860 		*p += len;
861 
862 		*p += 4; /* skip hit_set_period */
863 		*p += 4; /* skip hit_set_count */
864 	}
865 
866 	if (ev >= 12)
867 		*p += 4; /* skip stripe_width */
868 
869 	if (ev >= 13) {
870 		*p += 8; /* skip target_max_bytes */
871 		*p += 8; /* skip target_max_objects */
872 		*p += 4; /* skip cache_target_dirty_ratio_micro */
873 		*p += 4; /* skip cache_target_full_ratio_micro */
874 		*p += 4; /* skip cache_min_flush_age */
875 		*p += 4; /* skip cache_min_evict_age */
876 	}
877 
878 	if (ev >=  14) {
879 		/* skip erasure_code_profile */
880 		len = ceph_decode_32(p);
881 		*p += len;
882 	}
883 
884 	/*
885 	 * last_force_op_resend_preluminous, will be overridden if the
886 	 * map was encoded with RESEND_ON_SPLIT
887 	 */
888 	if (ev >= 15)
889 		pi->last_force_request_resend = ceph_decode_32(p);
890 	else
891 		pi->last_force_request_resend = 0;
892 
893 	if (ev >= 16)
894 		*p += 4; /* skip min_read_recency_for_promote */
895 
896 	if (ev >= 17)
897 		*p += 8; /* skip expected_num_objects */
898 
899 	if (ev >= 19)
900 		*p += 4; /* skip cache_target_dirty_high_ratio_micro */
901 
902 	if (ev >= 20)
903 		*p += 4; /* skip min_write_recency_for_promote */
904 
905 	if (ev >= 21)
906 		*p += 1; /* skip use_gmt_hitset */
907 
908 	if (ev >= 22)
909 		*p += 1; /* skip fast_read */
910 
911 	if (ev >= 23) {
912 		*p += 4; /* skip hit_set_grade_decay_rate */
913 		*p += 4; /* skip hit_set_search_last_n */
914 	}
915 
916 	if (ev >= 24) {
917 		/* skip opts */
918 		*p += 1 + 1; /* versions */
919 		len = ceph_decode_32(p);
920 		*p += len;
921 	}
922 
923 	if (ev >= 25)
924 		pi->last_force_request_resend = ceph_decode_32(p);
925 
926 	/* ignore the rest */
927 
928 	*p = pool_end;
929 	calc_pg_masks(pi);
930 	return 0;
931 
932 bad:
933 	return -EINVAL;
934 }
935 
936 static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
937 {
938 	struct ceph_pg_pool_info *pi;
939 	u32 num, len;
940 	u64 pool;
941 
942 	ceph_decode_32_safe(p, end, num, bad);
943 	dout(" %d pool names\n", num);
944 	while (num--) {
945 		ceph_decode_64_safe(p, end, pool, bad);
946 		ceph_decode_32_safe(p, end, len, bad);
947 		dout("  pool %llu len %d\n", pool, len);
948 		ceph_decode_need(p, end, len, bad);
949 		pi = lookup_pg_pool(&map->pg_pools, pool);
950 		if (pi) {
951 			char *name = kstrndup(*p, len, GFP_NOFS);
952 
953 			if (!name)
954 				return -ENOMEM;
955 			kfree(pi->name);
956 			pi->name = name;
957 			dout("  name is %s\n", pi->name);
958 		}
959 		*p += len;
960 	}
961 	return 0;
962 
963 bad:
964 	return -EINVAL;
965 }
966 
967 /*
968  * osd map
969  */
970 struct ceph_osdmap *ceph_osdmap_alloc(void)
971 {
972 	struct ceph_osdmap *map;
973 
974 	map = kzalloc(sizeof(*map), GFP_NOIO);
975 	if (!map)
976 		return NULL;
977 
978 	map->pg_pools = RB_ROOT;
979 	map->pool_max = -1;
980 	map->pg_temp = RB_ROOT;
981 	map->primary_temp = RB_ROOT;
982 	map->pg_upmap = RB_ROOT;
983 	map->pg_upmap_items = RB_ROOT;
984 	mutex_init(&map->crush_workspace_mutex);
985 
986 	return map;
987 }
988 
989 void ceph_osdmap_destroy(struct ceph_osdmap *map)
990 {
991 	dout("osdmap_destroy %p\n", map);
992 	if (map->crush)
993 		crush_destroy(map->crush);
994 	while (!RB_EMPTY_ROOT(&map->pg_temp)) {
995 		struct ceph_pg_mapping *pg =
996 			rb_entry(rb_first(&map->pg_temp),
997 				 struct ceph_pg_mapping, node);
998 		erase_pg_mapping(&map->pg_temp, pg);
999 		free_pg_mapping(pg);
1000 	}
1001 	while (!RB_EMPTY_ROOT(&map->primary_temp)) {
1002 		struct ceph_pg_mapping *pg =
1003 			rb_entry(rb_first(&map->primary_temp),
1004 				 struct ceph_pg_mapping, node);
1005 		erase_pg_mapping(&map->primary_temp, pg);
1006 		free_pg_mapping(pg);
1007 	}
1008 	while (!RB_EMPTY_ROOT(&map->pg_upmap)) {
1009 		struct ceph_pg_mapping *pg =
1010 			rb_entry(rb_first(&map->pg_upmap),
1011 				 struct ceph_pg_mapping, node);
1012 		rb_erase(&pg->node, &map->pg_upmap);
1013 		kfree(pg);
1014 	}
1015 	while (!RB_EMPTY_ROOT(&map->pg_upmap_items)) {
1016 		struct ceph_pg_mapping *pg =
1017 			rb_entry(rb_first(&map->pg_upmap_items),
1018 				 struct ceph_pg_mapping, node);
1019 		rb_erase(&pg->node, &map->pg_upmap_items);
1020 		kfree(pg);
1021 	}
1022 	while (!RB_EMPTY_ROOT(&map->pg_pools)) {
1023 		struct ceph_pg_pool_info *pi =
1024 			rb_entry(rb_first(&map->pg_pools),
1025 				 struct ceph_pg_pool_info, node);
1026 		__remove_pg_pool(&map->pg_pools, pi);
1027 	}
1028 	kvfree(map->osd_state);
1029 	kvfree(map->osd_weight);
1030 	kvfree(map->osd_addr);
1031 	kvfree(map->osd_primary_affinity);
1032 	kvfree(map->crush_workspace);
1033 	kfree(map);
1034 }
1035 
1036 /*
1037  * Adjust max_osd value, (re)allocate arrays.
1038  *
1039  * The new elements are properly initialized.
1040  */
1041 static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
1042 {
1043 	u32 *state;
1044 	u32 *weight;
1045 	struct ceph_entity_addr *addr;
1046 	u32 to_copy;
1047 	int i;
1048 
1049 	dout("%s old %u new %u\n", __func__, map->max_osd, max);
1050 	if (max == map->max_osd)
1051 		return 0;
1052 
1053 	state = ceph_kvmalloc(array_size(max, sizeof(*state)), GFP_NOFS);
1054 	weight = ceph_kvmalloc(array_size(max, sizeof(*weight)), GFP_NOFS);
1055 	addr = ceph_kvmalloc(array_size(max, sizeof(*addr)), GFP_NOFS);
1056 	if (!state || !weight || !addr) {
1057 		kvfree(state);
1058 		kvfree(weight);
1059 		kvfree(addr);
1060 		return -ENOMEM;
1061 	}
1062 
1063 	to_copy = min(map->max_osd, max);
1064 	if (map->osd_state) {
1065 		memcpy(state, map->osd_state, to_copy * sizeof(*state));
1066 		memcpy(weight, map->osd_weight, to_copy * sizeof(*weight));
1067 		memcpy(addr, map->osd_addr, to_copy * sizeof(*addr));
1068 		kvfree(map->osd_state);
1069 		kvfree(map->osd_weight);
1070 		kvfree(map->osd_addr);
1071 	}
1072 
1073 	map->osd_state = state;
1074 	map->osd_weight = weight;
1075 	map->osd_addr = addr;
1076 	for (i = map->max_osd; i < max; i++) {
1077 		map->osd_state[i] = 0;
1078 		map->osd_weight[i] = CEPH_OSD_OUT;
1079 		memset(map->osd_addr + i, 0, sizeof(*map->osd_addr));
1080 	}
1081 
1082 	if (map->osd_primary_affinity) {
1083 		u32 *affinity;
1084 
1085 		affinity = ceph_kvmalloc(array_size(max, sizeof(*affinity)),
1086 					 GFP_NOFS);
1087 		if (!affinity)
1088 			return -ENOMEM;
1089 
1090 		memcpy(affinity, map->osd_primary_affinity,
1091 		       to_copy * sizeof(*affinity));
1092 		kvfree(map->osd_primary_affinity);
1093 
1094 		map->osd_primary_affinity = affinity;
1095 		for (i = map->max_osd; i < max; i++)
1096 			map->osd_primary_affinity[i] =
1097 			    CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1098 	}
1099 
1100 	map->max_osd = max;
1101 
1102 	return 0;
1103 }
1104 
1105 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
1106 {
1107 	void *workspace;
1108 	size_t work_size;
1109 
1110 	if (IS_ERR(crush))
1111 		return PTR_ERR(crush);
1112 
1113 	work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
1114 	dout("%s work_size %zu bytes\n", __func__, work_size);
1115 	workspace = ceph_kvmalloc(work_size, GFP_NOIO);
1116 	if (!workspace) {
1117 		crush_destroy(crush);
1118 		return -ENOMEM;
1119 	}
1120 	crush_init_workspace(crush, workspace);
1121 
1122 	if (map->crush)
1123 		crush_destroy(map->crush);
1124 	kvfree(map->crush_workspace);
1125 	map->crush = crush;
1126 	map->crush_workspace = workspace;
1127 	return 0;
1128 }
1129 
1130 #define OSDMAP_WRAPPER_COMPAT_VER	7
1131 #define OSDMAP_CLIENT_DATA_COMPAT_VER	1
1132 
1133 /*
1134  * Return 0 or error.  On success, *v is set to 0 for old (v6) osdmaps,
1135  * to struct_v of the client_data section for new (v7 and above)
1136  * osdmaps.
1137  */
1138 static int get_osdmap_client_data_v(void **p, void *end,
1139 				    const char *prefix, u8 *v)
1140 {
1141 	u8 struct_v;
1142 
1143 	ceph_decode_8_safe(p, end, struct_v, e_inval);
1144 	if (struct_v >= 7) {
1145 		u8 struct_compat;
1146 
1147 		ceph_decode_8_safe(p, end, struct_compat, e_inval);
1148 		if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) {
1149 			pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n",
1150 				struct_v, struct_compat,
1151 				OSDMAP_WRAPPER_COMPAT_VER, prefix);
1152 			return -EINVAL;
1153 		}
1154 		*p += 4; /* ignore wrapper struct_len */
1155 
1156 		ceph_decode_8_safe(p, end, struct_v, e_inval);
1157 		ceph_decode_8_safe(p, end, struct_compat, e_inval);
1158 		if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) {
1159 			pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n",
1160 				struct_v, struct_compat,
1161 				OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
1162 			return -EINVAL;
1163 		}
1164 		*p += 4; /* ignore client data struct_len */
1165 	} else {
1166 		u16 version;
1167 
1168 		*p -= 1;
1169 		ceph_decode_16_safe(p, end, version, e_inval);
1170 		if (version < 6) {
1171 			pr_warn("got v %d < 6 of %s ceph_osdmap\n",
1172 				version, prefix);
1173 			return -EINVAL;
1174 		}
1175 
1176 		/* old osdmap enconding */
1177 		struct_v = 0;
1178 	}
1179 
1180 	*v = struct_v;
1181 	return 0;
1182 
1183 e_inval:
1184 	return -EINVAL;
1185 }
1186 
1187 static int __decode_pools(void **p, void *end, struct ceph_osdmap *map,
1188 			  bool incremental)
1189 {
1190 	u32 n;
1191 
1192 	ceph_decode_32_safe(p, end, n, e_inval);
1193 	while (n--) {
1194 		struct ceph_pg_pool_info *pi;
1195 		u64 pool;
1196 		int ret;
1197 
1198 		ceph_decode_64_safe(p, end, pool, e_inval);
1199 
1200 		pi = lookup_pg_pool(&map->pg_pools, pool);
1201 		if (!incremental || !pi) {
1202 			pi = kzalloc(sizeof(*pi), GFP_NOFS);
1203 			if (!pi)
1204 				return -ENOMEM;
1205 
1206 			RB_CLEAR_NODE(&pi->node);
1207 			pi->id = pool;
1208 
1209 			if (!__insert_pg_pool(&map->pg_pools, pi)) {
1210 				kfree(pi);
1211 				return -EEXIST;
1212 			}
1213 		}
1214 
1215 		ret = decode_pool(p, end, pi);
1216 		if (ret)
1217 			return ret;
1218 	}
1219 
1220 	return 0;
1221 
1222 e_inval:
1223 	return -EINVAL;
1224 }
1225 
1226 static int decode_pools(void **p, void *end, struct ceph_osdmap *map)
1227 {
1228 	return __decode_pools(p, end, map, false);
1229 }
1230 
1231 static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map)
1232 {
1233 	return __decode_pools(p, end, map, true);
1234 }
1235 
1236 typedef struct ceph_pg_mapping *(*decode_mapping_fn_t)(void **, void *, bool);
1237 
1238 static int decode_pg_mapping(void **p, void *end, struct rb_root *mapping_root,
1239 			     decode_mapping_fn_t fn, bool incremental)
1240 {
1241 	u32 n;
1242 
1243 	WARN_ON(!incremental && !fn);
1244 
1245 	ceph_decode_32_safe(p, end, n, e_inval);
1246 	while (n--) {
1247 		struct ceph_pg_mapping *pg;
1248 		struct ceph_pg pgid;
1249 		int ret;
1250 
1251 		ret = ceph_decode_pgid(p, end, &pgid);
1252 		if (ret)
1253 			return ret;
1254 
1255 		pg = lookup_pg_mapping(mapping_root, &pgid);
1256 		if (pg) {
1257 			WARN_ON(!incremental);
1258 			erase_pg_mapping(mapping_root, pg);
1259 			free_pg_mapping(pg);
1260 		}
1261 
1262 		if (fn) {
1263 			pg = fn(p, end, incremental);
1264 			if (IS_ERR(pg))
1265 				return PTR_ERR(pg);
1266 
1267 			if (pg) {
1268 				pg->pgid = pgid; /* struct */
1269 				insert_pg_mapping(mapping_root, pg);
1270 			}
1271 		}
1272 	}
1273 
1274 	return 0;
1275 
1276 e_inval:
1277 	return -EINVAL;
1278 }
1279 
1280 static struct ceph_pg_mapping *__decode_pg_temp(void **p, void *end,
1281 						bool incremental)
1282 {
1283 	struct ceph_pg_mapping *pg;
1284 	u32 len, i;
1285 
1286 	ceph_decode_32_safe(p, end, len, e_inval);
1287 	if (len == 0 && incremental)
1288 		return NULL;	/* new_pg_temp: [] to remove */
1289 	if (len > (SIZE_MAX - sizeof(*pg)) / sizeof(u32))
1290 		return ERR_PTR(-EINVAL);
1291 
1292 	ceph_decode_need(p, end, len * sizeof(u32), e_inval);
1293 	pg = alloc_pg_mapping(len * sizeof(u32));
1294 	if (!pg)
1295 		return ERR_PTR(-ENOMEM);
1296 
1297 	pg->pg_temp.len = len;
1298 	for (i = 0; i < len; i++)
1299 		pg->pg_temp.osds[i] = ceph_decode_32(p);
1300 
1301 	return pg;
1302 
1303 e_inval:
1304 	return ERR_PTR(-EINVAL);
1305 }
1306 
1307 static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1308 {
1309 	return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
1310 				 false);
1311 }
1312 
1313 static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1314 {
1315 	return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
1316 				 true);
1317 }
1318 
1319 static struct ceph_pg_mapping *__decode_primary_temp(void **p, void *end,
1320 						     bool incremental)
1321 {
1322 	struct ceph_pg_mapping *pg;
1323 	u32 osd;
1324 
1325 	ceph_decode_32_safe(p, end, osd, e_inval);
1326 	if (osd == (u32)-1 && incremental)
1327 		return NULL;	/* new_primary_temp: -1 to remove */
1328 
1329 	pg = alloc_pg_mapping(0);
1330 	if (!pg)
1331 		return ERR_PTR(-ENOMEM);
1332 
1333 	pg->primary_temp.osd = osd;
1334 	return pg;
1335 
1336 e_inval:
1337 	return ERR_PTR(-EINVAL);
1338 }
1339 
1340 static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map)
1341 {
1342 	return decode_pg_mapping(p, end, &map->primary_temp,
1343 				 __decode_primary_temp, false);
1344 }
1345 
1346 static int decode_new_primary_temp(void **p, void *end,
1347 				   struct ceph_osdmap *map)
1348 {
1349 	return decode_pg_mapping(p, end, &map->primary_temp,
1350 				 __decode_primary_temp, true);
1351 }
1352 
1353 u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd)
1354 {
1355 	BUG_ON(osd >= map->max_osd);
1356 
1357 	if (!map->osd_primary_affinity)
1358 		return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1359 
1360 	return map->osd_primary_affinity[osd];
1361 }
1362 
1363 static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff)
1364 {
1365 	BUG_ON(osd >= map->max_osd);
1366 
1367 	if (!map->osd_primary_affinity) {
1368 		int i;
1369 
1370 		map->osd_primary_affinity = ceph_kvmalloc(
1371 		    array_size(map->max_osd, sizeof(*map->osd_primary_affinity)),
1372 		    GFP_NOFS);
1373 		if (!map->osd_primary_affinity)
1374 			return -ENOMEM;
1375 
1376 		for (i = 0; i < map->max_osd; i++)
1377 			map->osd_primary_affinity[i] =
1378 			    CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1379 	}
1380 
1381 	map->osd_primary_affinity[osd] = aff;
1382 
1383 	return 0;
1384 }
1385 
1386 static int decode_primary_affinity(void **p, void *end,
1387 				   struct ceph_osdmap *map)
1388 {
1389 	u32 len, i;
1390 
1391 	ceph_decode_32_safe(p, end, len, e_inval);
1392 	if (len == 0) {
1393 		kvfree(map->osd_primary_affinity);
1394 		map->osd_primary_affinity = NULL;
1395 		return 0;
1396 	}
1397 	if (len != map->max_osd)
1398 		goto e_inval;
1399 
1400 	ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval);
1401 
1402 	for (i = 0; i < map->max_osd; i++) {
1403 		int ret;
1404 
1405 		ret = set_primary_affinity(map, i, ceph_decode_32(p));
1406 		if (ret)
1407 			return ret;
1408 	}
1409 
1410 	return 0;
1411 
1412 e_inval:
1413 	return -EINVAL;
1414 }
1415 
1416 static int decode_new_primary_affinity(void **p, void *end,
1417 				       struct ceph_osdmap *map)
1418 {
1419 	u32 n;
1420 
1421 	ceph_decode_32_safe(p, end, n, e_inval);
1422 	while (n--) {
1423 		u32 osd, aff;
1424 		int ret;
1425 
1426 		ceph_decode_32_safe(p, end, osd, e_inval);
1427 		ceph_decode_32_safe(p, end, aff, e_inval);
1428 
1429 		ret = set_primary_affinity(map, osd, aff);
1430 		if (ret)
1431 			return ret;
1432 
1433 		pr_info("osd%d primary-affinity 0x%x\n", osd, aff);
1434 	}
1435 
1436 	return 0;
1437 
1438 e_inval:
1439 	return -EINVAL;
1440 }
1441 
1442 static struct ceph_pg_mapping *__decode_pg_upmap(void **p, void *end,
1443 						 bool __unused)
1444 {
1445 	return __decode_pg_temp(p, end, false);
1446 }
1447 
1448 static int decode_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1449 {
1450 	return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
1451 				 false);
1452 }
1453 
1454 static int decode_new_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1455 {
1456 	return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
1457 				 true);
1458 }
1459 
1460 static int decode_old_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1461 {
1462 	return decode_pg_mapping(p, end, &map->pg_upmap, NULL, true);
1463 }
1464 
1465 static struct ceph_pg_mapping *__decode_pg_upmap_items(void **p, void *end,
1466 						       bool __unused)
1467 {
1468 	struct ceph_pg_mapping *pg;
1469 	u32 len, i;
1470 
1471 	ceph_decode_32_safe(p, end, len, e_inval);
1472 	if (len > (SIZE_MAX - sizeof(*pg)) / (2 * sizeof(u32)))
1473 		return ERR_PTR(-EINVAL);
1474 
1475 	ceph_decode_need(p, end, 2 * len * sizeof(u32), e_inval);
1476 	pg = alloc_pg_mapping(2 * len * sizeof(u32));
1477 	if (!pg)
1478 		return ERR_PTR(-ENOMEM);
1479 
1480 	pg->pg_upmap_items.len = len;
1481 	for (i = 0; i < len; i++) {
1482 		pg->pg_upmap_items.from_to[i][0] = ceph_decode_32(p);
1483 		pg->pg_upmap_items.from_to[i][1] = ceph_decode_32(p);
1484 	}
1485 
1486 	return pg;
1487 
1488 e_inval:
1489 	return ERR_PTR(-EINVAL);
1490 }
1491 
1492 static int decode_pg_upmap_items(void **p, void *end, struct ceph_osdmap *map)
1493 {
1494 	return decode_pg_mapping(p, end, &map->pg_upmap_items,
1495 				 __decode_pg_upmap_items, false);
1496 }
1497 
1498 static int decode_new_pg_upmap_items(void **p, void *end,
1499 				     struct ceph_osdmap *map)
1500 {
1501 	return decode_pg_mapping(p, end, &map->pg_upmap_items,
1502 				 __decode_pg_upmap_items, true);
1503 }
1504 
1505 static int decode_old_pg_upmap_items(void **p, void *end,
1506 				     struct ceph_osdmap *map)
1507 {
1508 	return decode_pg_mapping(p, end, &map->pg_upmap_items, NULL, true);
1509 }
1510 
1511 /*
1512  * decode a full map.
1513  */
1514 static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
1515 {
1516 	u8 struct_v;
1517 	u32 epoch = 0;
1518 	void *start = *p;
1519 	u32 max;
1520 	u32 len, i;
1521 	int err;
1522 
1523 	dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1524 
1525 	err = get_osdmap_client_data_v(p, end, "full", &struct_v);
1526 	if (err)
1527 		goto bad;
1528 
1529 	/* fsid, epoch, created, modified */
1530 	ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) +
1531 			 sizeof(map->created) + sizeof(map->modified), e_inval);
1532 	ceph_decode_copy(p, &map->fsid, sizeof(map->fsid));
1533 	epoch = map->epoch = ceph_decode_32(p);
1534 	ceph_decode_copy(p, &map->created, sizeof(map->created));
1535 	ceph_decode_copy(p, &map->modified, sizeof(map->modified));
1536 
1537 	/* pools */
1538 	err = decode_pools(p, end, map);
1539 	if (err)
1540 		goto bad;
1541 
1542 	/* pool_name */
1543 	err = decode_pool_names(p, end, map);
1544 	if (err)
1545 		goto bad;
1546 
1547 	ceph_decode_32_safe(p, end, map->pool_max, e_inval);
1548 
1549 	ceph_decode_32_safe(p, end, map->flags, e_inval);
1550 
1551 	/* max_osd */
1552 	ceph_decode_32_safe(p, end, max, e_inval);
1553 
1554 	/* (re)alloc osd arrays */
1555 	err = osdmap_set_max_osd(map, max);
1556 	if (err)
1557 		goto bad;
1558 
1559 	/* osd_state, osd_weight, osd_addrs->client_addr */
1560 	ceph_decode_need(p, end, 3*sizeof(u32) +
1561 			 map->max_osd*(struct_v >= 5 ? sizeof(u32) :
1562 						       sizeof(u8)) +
1563 				       sizeof(*map->osd_weight), e_inval);
1564 	if (ceph_decode_32(p) != map->max_osd)
1565 		goto e_inval;
1566 
1567 	if (struct_v >= 5) {
1568 		for (i = 0; i < map->max_osd; i++)
1569 			map->osd_state[i] = ceph_decode_32(p);
1570 	} else {
1571 		for (i = 0; i < map->max_osd; i++)
1572 			map->osd_state[i] = ceph_decode_8(p);
1573 	}
1574 
1575 	if (ceph_decode_32(p) != map->max_osd)
1576 		goto e_inval;
1577 
1578 	for (i = 0; i < map->max_osd; i++)
1579 		map->osd_weight[i] = ceph_decode_32(p);
1580 
1581 	if (ceph_decode_32(p) != map->max_osd)
1582 		goto e_inval;
1583 
1584 	for (i = 0; i < map->max_osd; i++) {
1585 		err = ceph_decode_entity_addr(p, end, &map->osd_addr[i]);
1586 		if (err)
1587 			goto bad;
1588 	}
1589 
1590 	/* pg_temp */
1591 	err = decode_pg_temp(p, end, map);
1592 	if (err)
1593 		goto bad;
1594 
1595 	/* primary_temp */
1596 	if (struct_v >= 1) {
1597 		err = decode_primary_temp(p, end, map);
1598 		if (err)
1599 			goto bad;
1600 	}
1601 
1602 	/* primary_affinity */
1603 	if (struct_v >= 2) {
1604 		err = decode_primary_affinity(p, end, map);
1605 		if (err)
1606 			goto bad;
1607 	} else {
1608 		WARN_ON(map->osd_primary_affinity);
1609 	}
1610 
1611 	/* crush */
1612 	ceph_decode_32_safe(p, end, len, e_inval);
1613 	err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end)));
1614 	if (err)
1615 		goto bad;
1616 
1617 	*p += len;
1618 	if (struct_v >= 3) {
1619 		/* erasure_code_profiles */
1620 		ceph_decode_skip_map_of_map(p, end, string, string, string,
1621 					    e_inval);
1622 	}
1623 
1624 	if (struct_v >= 4) {
1625 		err = decode_pg_upmap(p, end, map);
1626 		if (err)
1627 			goto bad;
1628 
1629 		err = decode_pg_upmap_items(p, end, map);
1630 		if (err)
1631 			goto bad;
1632 	} else {
1633 		WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap));
1634 		WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap_items));
1635 	}
1636 
1637 	/* ignore the rest */
1638 	*p = end;
1639 
1640 	dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
1641 	return 0;
1642 
1643 e_inval:
1644 	err = -EINVAL;
1645 bad:
1646 	pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1647 	       err, epoch, (int)(*p - start), *p, start, end);
1648 	print_hex_dump(KERN_DEBUG, "osdmap: ",
1649 		       DUMP_PREFIX_OFFSET, 16, 1,
1650 		       start, end - start, true);
1651 	return err;
1652 }
1653 
1654 /*
1655  * Allocate and decode a full map.
1656  */
1657 struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end)
1658 {
1659 	struct ceph_osdmap *map;
1660 	int ret;
1661 
1662 	map = ceph_osdmap_alloc();
1663 	if (!map)
1664 		return ERR_PTR(-ENOMEM);
1665 
1666 	ret = osdmap_decode(p, end, map);
1667 	if (ret) {
1668 		ceph_osdmap_destroy(map);
1669 		return ERR_PTR(ret);
1670 	}
1671 
1672 	return map;
1673 }
1674 
1675 /*
1676  * Encoding order is (new_up_client, new_state, new_weight).  Need to
1677  * apply in the (new_weight, new_state, new_up_client) order, because
1678  * an incremental map may look like e.g.
1679  *
1680  *     new_up_client: { osd=6, addr=... } # set osd_state and addr
1681  *     new_state: { osd=6, xorstate=EXISTS } # clear osd_state
1682  */
1683 static int decode_new_up_state_weight(void **p, void *end, u8 struct_v,
1684 				      struct ceph_osdmap *map)
1685 {
1686 	void *new_up_client;
1687 	void *new_state;
1688 	void *new_weight_end;
1689 	u32 len;
1690 	int i;
1691 
1692 	new_up_client = *p;
1693 	ceph_decode_32_safe(p, end, len, e_inval);
1694 	for (i = 0; i < len; ++i) {
1695 		struct ceph_entity_addr addr;
1696 
1697 		ceph_decode_skip_32(p, end, e_inval);
1698 		if (ceph_decode_entity_addr(p, end, &addr))
1699 			goto e_inval;
1700 	}
1701 
1702 	new_state = *p;
1703 	ceph_decode_32_safe(p, end, len, e_inval);
1704 	len *= sizeof(u32) + (struct_v >= 5 ? sizeof(u32) : sizeof(u8));
1705 	ceph_decode_need(p, end, len, e_inval);
1706 	*p += len;
1707 
1708 	/* new_weight */
1709 	ceph_decode_32_safe(p, end, len, e_inval);
1710 	while (len--) {
1711 		s32 osd;
1712 		u32 w;
1713 
1714 		ceph_decode_need(p, end, 2*sizeof(u32), e_inval);
1715 		osd = ceph_decode_32(p);
1716 		w = ceph_decode_32(p);
1717 		BUG_ON(osd >= map->max_osd);
1718 		pr_info("osd%d weight 0x%x %s\n", osd, w,
1719 		     w == CEPH_OSD_IN ? "(in)" :
1720 		     (w == CEPH_OSD_OUT ? "(out)" : ""));
1721 		map->osd_weight[osd] = w;
1722 
1723 		/*
1724 		 * If we are marking in, set the EXISTS, and clear the
1725 		 * AUTOOUT and NEW bits.
1726 		 */
1727 		if (w) {
1728 			map->osd_state[osd] |= CEPH_OSD_EXISTS;
1729 			map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT |
1730 						 CEPH_OSD_NEW);
1731 		}
1732 	}
1733 	new_weight_end = *p;
1734 
1735 	/* new_state (up/down) */
1736 	*p = new_state;
1737 	len = ceph_decode_32(p);
1738 	while (len--) {
1739 		s32 osd;
1740 		u32 xorstate;
1741 		int ret;
1742 
1743 		osd = ceph_decode_32(p);
1744 		if (struct_v >= 5)
1745 			xorstate = ceph_decode_32(p);
1746 		else
1747 			xorstate = ceph_decode_8(p);
1748 		if (xorstate == 0)
1749 			xorstate = CEPH_OSD_UP;
1750 		BUG_ON(osd >= map->max_osd);
1751 		if ((map->osd_state[osd] & CEPH_OSD_UP) &&
1752 		    (xorstate & CEPH_OSD_UP))
1753 			pr_info("osd%d down\n", osd);
1754 		if ((map->osd_state[osd] & CEPH_OSD_EXISTS) &&
1755 		    (xorstate & CEPH_OSD_EXISTS)) {
1756 			pr_info("osd%d does not exist\n", osd);
1757 			ret = set_primary_affinity(map, osd,
1758 						   CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
1759 			if (ret)
1760 				return ret;
1761 			memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr));
1762 			map->osd_state[osd] = 0;
1763 		} else {
1764 			map->osd_state[osd] ^= xorstate;
1765 		}
1766 	}
1767 
1768 	/* new_up_client */
1769 	*p = new_up_client;
1770 	len = ceph_decode_32(p);
1771 	while (len--) {
1772 		s32 osd;
1773 		struct ceph_entity_addr addr;
1774 
1775 		osd = ceph_decode_32(p);
1776 		BUG_ON(osd >= map->max_osd);
1777 		if (ceph_decode_entity_addr(p, end, &addr))
1778 			goto e_inval;
1779 		pr_info("osd%d up\n", osd);
1780 		map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
1781 		map->osd_addr[osd] = addr;
1782 	}
1783 
1784 	*p = new_weight_end;
1785 	return 0;
1786 
1787 e_inval:
1788 	return -EINVAL;
1789 }
1790 
1791 /*
1792  * decode and apply an incremental map update.
1793  */
1794 struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
1795 					     struct ceph_osdmap *map)
1796 {
1797 	struct ceph_fsid fsid;
1798 	u32 epoch = 0;
1799 	struct ceph_timespec modified;
1800 	s32 len;
1801 	u64 pool;
1802 	__s64 new_pool_max;
1803 	__s32 new_flags, max;
1804 	void *start = *p;
1805 	int err;
1806 	u8 struct_v;
1807 
1808 	dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1809 
1810 	err = get_osdmap_client_data_v(p, end, "inc", &struct_v);
1811 	if (err)
1812 		goto bad;
1813 
1814 	/* fsid, epoch, modified, new_pool_max, new_flags */
1815 	ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) +
1816 			 sizeof(u64) + sizeof(u32), e_inval);
1817 	ceph_decode_copy(p, &fsid, sizeof(fsid));
1818 	epoch = ceph_decode_32(p);
1819 	BUG_ON(epoch != map->epoch+1);
1820 	ceph_decode_copy(p, &modified, sizeof(modified));
1821 	new_pool_max = ceph_decode_64(p);
1822 	new_flags = ceph_decode_32(p);
1823 
1824 	/* full map? */
1825 	ceph_decode_32_safe(p, end, len, e_inval);
1826 	if (len > 0) {
1827 		dout("apply_incremental full map len %d, %p to %p\n",
1828 		     len, *p, end);
1829 		return ceph_osdmap_decode(p, min(*p+len, end));
1830 	}
1831 
1832 	/* new crush? */
1833 	ceph_decode_32_safe(p, end, len, e_inval);
1834 	if (len > 0) {
1835 		err = osdmap_set_crush(map,
1836 				       crush_decode(*p, min(*p + len, end)));
1837 		if (err)
1838 			goto bad;
1839 		*p += len;
1840 	}
1841 
1842 	/* new flags? */
1843 	if (new_flags >= 0)
1844 		map->flags = new_flags;
1845 	if (new_pool_max >= 0)
1846 		map->pool_max = new_pool_max;
1847 
1848 	/* new max? */
1849 	ceph_decode_32_safe(p, end, max, e_inval);
1850 	if (max >= 0) {
1851 		err = osdmap_set_max_osd(map, max);
1852 		if (err)
1853 			goto bad;
1854 	}
1855 
1856 	map->epoch++;
1857 	map->modified = modified;
1858 
1859 	/* new_pools */
1860 	err = decode_new_pools(p, end, map);
1861 	if (err)
1862 		goto bad;
1863 
1864 	/* new_pool_names */
1865 	err = decode_pool_names(p, end, map);
1866 	if (err)
1867 		goto bad;
1868 
1869 	/* old_pool */
1870 	ceph_decode_32_safe(p, end, len, e_inval);
1871 	while (len--) {
1872 		struct ceph_pg_pool_info *pi;
1873 
1874 		ceph_decode_64_safe(p, end, pool, e_inval);
1875 		pi = lookup_pg_pool(&map->pg_pools, pool);
1876 		if (pi)
1877 			__remove_pg_pool(&map->pg_pools, pi);
1878 	}
1879 
1880 	/* new_up_client, new_state, new_weight */
1881 	err = decode_new_up_state_weight(p, end, struct_v, map);
1882 	if (err)
1883 		goto bad;
1884 
1885 	/* new_pg_temp */
1886 	err = decode_new_pg_temp(p, end, map);
1887 	if (err)
1888 		goto bad;
1889 
1890 	/* new_primary_temp */
1891 	if (struct_v >= 1) {
1892 		err = decode_new_primary_temp(p, end, map);
1893 		if (err)
1894 			goto bad;
1895 	}
1896 
1897 	/* new_primary_affinity */
1898 	if (struct_v >= 2) {
1899 		err = decode_new_primary_affinity(p, end, map);
1900 		if (err)
1901 			goto bad;
1902 	}
1903 
1904 	if (struct_v >= 3) {
1905 		/* new_erasure_code_profiles */
1906 		ceph_decode_skip_map_of_map(p, end, string, string, string,
1907 					    e_inval);
1908 		/* old_erasure_code_profiles */
1909 		ceph_decode_skip_set(p, end, string, e_inval);
1910 	}
1911 
1912 	if (struct_v >= 4) {
1913 		err = decode_new_pg_upmap(p, end, map);
1914 		if (err)
1915 			goto bad;
1916 
1917 		err = decode_old_pg_upmap(p, end, map);
1918 		if (err)
1919 			goto bad;
1920 
1921 		err = decode_new_pg_upmap_items(p, end, map);
1922 		if (err)
1923 			goto bad;
1924 
1925 		err = decode_old_pg_upmap_items(p, end, map);
1926 		if (err)
1927 			goto bad;
1928 	}
1929 
1930 	/* ignore the rest */
1931 	*p = end;
1932 
1933 	dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
1934 	return map;
1935 
1936 e_inval:
1937 	err = -EINVAL;
1938 bad:
1939 	pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1940 	       err, epoch, (int)(*p - start), *p, start, end);
1941 	print_hex_dump(KERN_DEBUG, "osdmap: ",
1942 		       DUMP_PREFIX_OFFSET, 16, 1,
1943 		       start, end - start, true);
1944 	return ERR_PTR(err);
1945 }
1946 
1947 void ceph_oloc_copy(struct ceph_object_locator *dest,
1948 		    const struct ceph_object_locator *src)
1949 {
1950 	ceph_oloc_destroy(dest);
1951 
1952 	dest->pool = src->pool;
1953 	if (src->pool_ns)
1954 		dest->pool_ns = ceph_get_string(src->pool_ns);
1955 	else
1956 		dest->pool_ns = NULL;
1957 }
1958 EXPORT_SYMBOL(ceph_oloc_copy);
1959 
1960 void ceph_oloc_destroy(struct ceph_object_locator *oloc)
1961 {
1962 	ceph_put_string(oloc->pool_ns);
1963 }
1964 EXPORT_SYMBOL(ceph_oloc_destroy);
1965 
1966 void ceph_oid_copy(struct ceph_object_id *dest,
1967 		   const struct ceph_object_id *src)
1968 {
1969 	ceph_oid_destroy(dest);
1970 
1971 	if (src->name != src->inline_name) {
1972 		/* very rare, see ceph_object_id definition */
1973 		dest->name = kmalloc(src->name_len + 1,
1974 				     GFP_NOIO | __GFP_NOFAIL);
1975 	} else {
1976 		dest->name = dest->inline_name;
1977 	}
1978 	memcpy(dest->name, src->name, src->name_len + 1);
1979 	dest->name_len = src->name_len;
1980 }
1981 EXPORT_SYMBOL(ceph_oid_copy);
1982 
1983 static __printf(2, 0)
1984 int oid_printf_vargs(struct ceph_object_id *oid, const char *fmt, va_list ap)
1985 {
1986 	int len;
1987 
1988 	WARN_ON(!ceph_oid_empty(oid));
1989 
1990 	len = vsnprintf(oid->inline_name, sizeof(oid->inline_name), fmt, ap);
1991 	if (len >= sizeof(oid->inline_name))
1992 		return len;
1993 
1994 	oid->name_len = len;
1995 	return 0;
1996 }
1997 
1998 /*
1999  * If oid doesn't fit into inline buffer, BUG.
2000  */
2001 void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...)
2002 {
2003 	va_list ap;
2004 
2005 	va_start(ap, fmt);
2006 	BUG_ON(oid_printf_vargs(oid, fmt, ap));
2007 	va_end(ap);
2008 }
2009 EXPORT_SYMBOL(ceph_oid_printf);
2010 
2011 static __printf(3, 0)
2012 int oid_aprintf_vargs(struct ceph_object_id *oid, gfp_t gfp,
2013 		      const char *fmt, va_list ap)
2014 {
2015 	va_list aq;
2016 	int len;
2017 
2018 	va_copy(aq, ap);
2019 	len = oid_printf_vargs(oid, fmt, aq);
2020 	va_end(aq);
2021 
2022 	if (len) {
2023 		char *external_name;
2024 
2025 		external_name = kmalloc(len + 1, gfp);
2026 		if (!external_name)
2027 			return -ENOMEM;
2028 
2029 		oid->name = external_name;
2030 		WARN_ON(vsnprintf(oid->name, len + 1, fmt, ap) != len);
2031 		oid->name_len = len;
2032 	}
2033 
2034 	return 0;
2035 }
2036 
2037 /*
2038  * If oid doesn't fit into inline buffer, allocate.
2039  */
2040 int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
2041 		     const char *fmt, ...)
2042 {
2043 	va_list ap;
2044 	int ret;
2045 
2046 	va_start(ap, fmt);
2047 	ret = oid_aprintf_vargs(oid, gfp, fmt, ap);
2048 	va_end(ap);
2049 
2050 	return ret;
2051 }
2052 EXPORT_SYMBOL(ceph_oid_aprintf);
2053 
2054 void ceph_oid_destroy(struct ceph_object_id *oid)
2055 {
2056 	if (oid->name != oid->inline_name)
2057 		kfree(oid->name);
2058 }
2059 EXPORT_SYMBOL(ceph_oid_destroy);
2060 
2061 /*
2062  * osds only
2063  */
2064 static bool __osds_equal(const struct ceph_osds *lhs,
2065 			 const struct ceph_osds *rhs)
2066 {
2067 	if (lhs->size == rhs->size &&
2068 	    !memcmp(lhs->osds, rhs->osds, rhs->size * sizeof(rhs->osds[0])))
2069 		return true;
2070 
2071 	return false;
2072 }
2073 
2074 /*
2075  * osds + primary
2076  */
2077 static bool osds_equal(const struct ceph_osds *lhs,
2078 		       const struct ceph_osds *rhs)
2079 {
2080 	if (__osds_equal(lhs, rhs) &&
2081 	    lhs->primary == rhs->primary)
2082 		return true;
2083 
2084 	return false;
2085 }
2086 
2087 static bool osds_valid(const struct ceph_osds *set)
2088 {
2089 	/* non-empty set */
2090 	if (set->size > 0 && set->primary >= 0)
2091 		return true;
2092 
2093 	/* empty can_shift_osds set */
2094 	if (!set->size && set->primary == -1)
2095 		return true;
2096 
2097 	/* empty !can_shift_osds set - all NONE */
2098 	if (set->size > 0 && set->primary == -1) {
2099 		int i;
2100 
2101 		for (i = 0; i < set->size; i++) {
2102 			if (set->osds[i] != CRUSH_ITEM_NONE)
2103 				break;
2104 		}
2105 		if (i == set->size)
2106 			return true;
2107 	}
2108 
2109 	return false;
2110 }
2111 
2112 void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src)
2113 {
2114 	memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0]));
2115 	dest->size = src->size;
2116 	dest->primary = src->primary;
2117 }
2118 
2119 bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num,
2120 		      u32 new_pg_num)
2121 {
2122 	int old_bits = calc_bits_of(old_pg_num);
2123 	int old_mask = (1 << old_bits) - 1;
2124 	int n;
2125 
2126 	WARN_ON(pgid->seed >= old_pg_num);
2127 	if (new_pg_num <= old_pg_num)
2128 		return false;
2129 
2130 	for (n = 1; ; n++) {
2131 		int next_bit = n << (old_bits - 1);
2132 		u32 s = next_bit | pgid->seed;
2133 
2134 		if (s < old_pg_num || s == pgid->seed)
2135 			continue;
2136 		if (s >= new_pg_num)
2137 			break;
2138 
2139 		s = ceph_stable_mod(s, old_pg_num, old_mask);
2140 		if (s == pgid->seed)
2141 			return true;
2142 	}
2143 
2144 	return false;
2145 }
2146 
2147 bool ceph_is_new_interval(const struct ceph_osds *old_acting,
2148 			  const struct ceph_osds *new_acting,
2149 			  const struct ceph_osds *old_up,
2150 			  const struct ceph_osds *new_up,
2151 			  int old_size,
2152 			  int new_size,
2153 			  int old_min_size,
2154 			  int new_min_size,
2155 			  u32 old_pg_num,
2156 			  u32 new_pg_num,
2157 			  bool old_sort_bitwise,
2158 			  bool new_sort_bitwise,
2159 			  bool old_recovery_deletes,
2160 			  bool new_recovery_deletes,
2161 			  const struct ceph_pg *pgid)
2162 {
2163 	return !osds_equal(old_acting, new_acting) ||
2164 	       !osds_equal(old_up, new_up) ||
2165 	       old_size != new_size ||
2166 	       old_min_size != new_min_size ||
2167 	       ceph_pg_is_split(pgid, old_pg_num, new_pg_num) ||
2168 	       old_sort_bitwise != new_sort_bitwise ||
2169 	       old_recovery_deletes != new_recovery_deletes;
2170 }
2171 
2172 static int calc_pg_rank(int osd, const struct ceph_osds *acting)
2173 {
2174 	int i;
2175 
2176 	for (i = 0; i < acting->size; i++) {
2177 		if (acting->osds[i] == osd)
2178 			return i;
2179 	}
2180 
2181 	return -1;
2182 }
2183 
2184 static bool primary_changed(const struct ceph_osds *old_acting,
2185 			    const struct ceph_osds *new_acting)
2186 {
2187 	if (!old_acting->size && !new_acting->size)
2188 		return false; /* both still empty */
2189 
2190 	if (!old_acting->size ^ !new_acting->size)
2191 		return true; /* was empty, now not, or vice versa */
2192 
2193 	if (old_acting->primary != new_acting->primary)
2194 		return true; /* primary changed */
2195 
2196 	if (calc_pg_rank(old_acting->primary, old_acting) !=
2197 	    calc_pg_rank(new_acting->primary, new_acting))
2198 		return true;
2199 
2200 	return false; /* same primary (tho replicas may have changed) */
2201 }
2202 
2203 bool ceph_osds_changed(const struct ceph_osds *old_acting,
2204 		       const struct ceph_osds *new_acting,
2205 		       bool any_change)
2206 {
2207 	if (primary_changed(old_acting, new_acting))
2208 		return true;
2209 
2210 	if (any_change && !__osds_equal(old_acting, new_acting))
2211 		return true;
2212 
2213 	return false;
2214 }
2215 
2216 /*
2217  * Map an object into a PG.
2218  *
2219  * Should only be called with target_oid and target_oloc (as opposed to
2220  * base_oid and base_oloc), since tiering isn't taken into account.
2221  */
2222 void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
2223 				 const struct ceph_object_id *oid,
2224 				 const struct ceph_object_locator *oloc,
2225 				 struct ceph_pg *raw_pgid)
2226 {
2227 	WARN_ON(pi->id != oloc->pool);
2228 
2229 	if (!oloc->pool_ns) {
2230 		raw_pgid->pool = oloc->pool;
2231 		raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
2232 					     oid->name_len);
2233 		dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
2234 		     raw_pgid->pool, raw_pgid->seed);
2235 	} else {
2236 		char stack_buf[256];
2237 		char *buf = stack_buf;
2238 		int nsl = oloc->pool_ns->len;
2239 		size_t total = nsl + 1 + oid->name_len;
2240 
2241 		if (total > sizeof(stack_buf))
2242 			buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL);
2243 		memcpy(buf, oloc->pool_ns->str, nsl);
2244 		buf[nsl] = '\037';
2245 		memcpy(buf + nsl + 1, oid->name, oid->name_len);
2246 		raw_pgid->pool = oloc->pool;
2247 		raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total);
2248 		if (buf != stack_buf)
2249 			kfree(buf);
2250 		dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__,
2251 		     oid->name, nsl, oloc->pool_ns->str,
2252 		     raw_pgid->pool, raw_pgid->seed);
2253 	}
2254 }
2255 
2256 int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
2257 			      const struct ceph_object_id *oid,
2258 			      const struct ceph_object_locator *oloc,
2259 			      struct ceph_pg *raw_pgid)
2260 {
2261 	struct ceph_pg_pool_info *pi;
2262 
2263 	pi = ceph_pg_pool_by_id(osdmap, oloc->pool);
2264 	if (!pi)
2265 		return -ENOENT;
2266 
2267 	__ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid);
2268 	return 0;
2269 }
2270 EXPORT_SYMBOL(ceph_object_locator_to_pg);
2271 
2272 /*
2273  * Map a raw PG (full precision ps) into an actual PG.
2274  */
2275 static void raw_pg_to_pg(struct ceph_pg_pool_info *pi,
2276 			 const struct ceph_pg *raw_pgid,
2277 			 struct ceph_pg *pgid)
2278 {
2279 	pgid->pool = raw_pgid->pool;
2280 	pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num,
2281 				     pi->pg_num_mask);
2282 }
2283 
2284 /*
2285  * Map a raw PG (full precision ps) into a placement ps (placement
2286  * seed).  Include pool id in that value so that different pools don't
2287  * use the same seeds.
2288  */
2289 static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi,
2290 			 const struct ceph_pg *raw_pgid)
2291 {
2292 	if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
2293 		/* hash pool id and seed so that pool PGs do not overlap */
2294 		return crush_hash32_2(CRUSH_HASH_RJENKINS1,
2295 				      ceph_stable_mod(raw_pgid->seed,
2296 						      pi->pgp_num,
2297 						      pi->pgp_num_mask),
2298 				      raw_pgid->pool);
2299 	} else {
2300 		/*
2301 		 * legacy behavior: add ps and pool together.  this is
2302 		 * not a great approach because the PGs from each pool
2303 		 * will overlap on top of each other: 0.5 == 1.4 ==
2304 		 * 2.3 == ...
2305 		 */
2306 		return ceph_stable_mod(raw_pgid->seed, pi->pgp_num,
2307 				       pi->pgp_num_mask) +
2308 		       (unsigned)raw_pgid->pool;
2309 	}
2310 }
2311 
2312 /*
2313  * Magic value used for a "default" fallback choose_args, used if the
2314  * crush_choose_arg_map passed to do_crush() does not exist.  If this
2315  * also doesn't exist, fall back to canonical weights.
2316  */
2317 #define CEPH_DEFAULT_CHOOSE_ARGS	-1
2318 
2319 static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
2320 		    int *result, int result_max,
2321 		    const __u32 *weight, int weight_max,
2322 		    s64 choose_args_index)
2323 {
2324 	struct crush_choose_arg_map *arg_map;
2325 	int r;
2326 
2327 	BUG_ON(result_max > CEPH_PG_MAX_SIZE);
2328 
2329 	arg_map = lookup_choose_arg_map(&map->crush->choose_args,
2330 					choose_args_index);
2331 	if (!arg_map)
2332 		arg_map = lookup_choose_arg_map(&map->crush->choose_args,
2333 						CEPH_DEFAULT_CHOOSE_ARGS);
2334 
2335 	mutex_lock(&map->crush_workspace_mutex);
2336 	r = crush_do_rule(map->crush, ruleno, x, result, result_max,
2337 			  weight, weight_max, map->crush_workspace,
2338 			  arg_map ? arg_map->args : NULL);
2339 	mutex_unlock(&map->crush_workspace_mutex);
2340 
2341 	return r;
2342 }
2343 
2344 static void remove_nonexistent_osds(struct ceph_osdmap *osdmap,
2345 				    struct ceph_pg_pool_info *pi,
2346 				    struct ceph_osds *set)
2347 {
2348 	int i;
2349 
2350 	if (ceph_can_shift_osds(pi)) {
2351 		int removed = 0;
2352 
2353 		/* shift left */
2354 		for (i = 0; i < set->size; i++) {
2355 			if (!ceph_osd_exists(osdmap, set->osds[i])) {
2356 				removed++;
2357 				continue;
2358 			}
2359 			if (removed)
2360 				set->osds[i - removed] = set->osds[i];
2361 		}
2362 		set->size -= removed;
2363 	} else {
2364 		/* set dne devices to NONE */
2365 		for (i = 0; i < set->size; i++) {
2366 			if (!ceph_osd_exists(osdmap, set->osds[i]))
2367 				set->osds[i] = CRUSH_ITEM_NONE;
2368 		}
2369 	}
2370 }
2371 
2372 /*
2373  * Calculate raw set (CRUSH output) for given PG and filter out
2374  * nonexistent OSDs.  ->primary is undefined for a raw set.
2375  *
2376  * Placement seed (CRUSH input) is returned through @ppps.
2377  */
2378 static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
2379 			   struct ceph_pg_pool_info *pi,
2380 			   const struct ceph_pg *raw_pgid,
2381 			   struct ceph_osds *raw,
2382 			   u32 *ppps)
2383 {
2384 	u32 pps = raw_pg_to_pps(pi, raw_pgid);
2385 	int ruleno;
2386 	int len;
2387 
2388 	ceph_osds_init(raw);
2389 	if (ppps)
2390 		*ppps = pps;
2391 
2392 	ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type,
2393 				 pi->size);
2394 	if (ruleno < 0) {
2395 		pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
2396 		       pi->id, pi->crush_ruleset, pi->type, pi->size);
2397 		return;
2398 	}
2399 
2400 	if (pi->size > ARRAY_SIZE(raw->osds)) {
2401 		pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n",
2402 		       pi->id, pi->crush_ruleset, pi->type, pi->size,
2403 		       ARRAY_SIZE(raw->osds));
2404 		return;
2405 	}
2406 
2407 	len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size,
2408 		       osdmap->osd_weight, osdmap->max_osd, pi->id);
2409 	if (len < 0) {
2410 		pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
2411 		       len, ruleno, pi->id, pi->crush_ruleset, pi->type,
2412 		       pi->size);
2413 		return;
2414 	}
2415 
2416 	raw->size = len;
2417 	remove_nonexistent_osds(osdmap, pi, raw);
2418 }
2419 
2420 /* apply pg_upmap[_items] mappings */
2421 static void apply_upmap(struct ceph_osdmap *osdmap,
2422 			const struct ceph_pg *pgid,
2423 			struct ceph_osds *raw)
2424 {
2425 	struct ceph_pg_mapping *pg;
2426 	int i, j;
2427 
2428 	pg = lookup_pg_mapping(&osdmap->pg_upmap, pgid);
2429 	if (pg) {
2430 		/* make sure targets aren't marked out */
2431 		for (i = 0; i < pg->pg_upmap.len; i++) {
2432 			int osd = pg->pg_upmap.osds[i];
2433 
2434 			if (osd != CRUSH_ITEM_NONE &&
2435 			    osd < osdmap->max_osd &&
2436 			    osdmap->osd_weight[osd] == 0) {
2437 				/* reject/ignore explicit mapping */
2438 				return;
2439 			}
2440 		}
2441 		for (i = 0; i < pg->pg_upmap.len; i++)
2442 			raw->osds[i] = pg->pg_upmap.osds[i];
2443 		raw->size = pg->pg_upmap.len;
2444 		/* check and apply pg_upmap_items, if any */
2445 	}
2446 
2447 	pg = lookup_pg_mapping(&osdmap->pg_upmap_items, pgid);
2448 	if (pg) {
2449 		/*
2450 		 * Note: this approach does not allow a bidirectional swap,
2451 		 * e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1].
2452 		 */
2453 		for (i = 0; i < pg->pg_upmap_items.len; i++) {
2454 			int from = pg->pg_upmap_items.from_to[i][0];
2455 			int to = pg->pg_upmap_items.from_to[i][1];
2456 			int pos = -1;
2457 			bool exists = false;
2458 
2459 			/* make sure replacement doesn't already appear */
2460 			for (j = 0; j < raw->size; j++) {
2461 				int osd = raw->osds[j];
2462 
2463 				if (osd == to) {
2464 					exists = true;
2465 					break;
2466 				}
2467 				/* ignore mapping if target is marked out */
2468 				if (osd == from && pos < 0 &&
2469 				    !(to != CRUSH_ITEM_NONE &&
2470 				      to < osdmap->max_osd &&
2471 				      osdmap->osd_weight[to] == 0)) {
2472 					pos = j;
2473 				}
2474 			}
2475 			if (!exists && pos >= 0)
2476 				raw->osds[pos] = to;
2477 		}
2478 	}
2479 }
2480 
2481 /*
2482  * Given raw set, calculate up set and up primary.  By definition of an
2483  * up set, the result won't contain nonexistent or down OSDs.
2484  *
2485  * This is done in-place - on return @set is the up set.  If it's
2486  * empty, ->primary will remain undefined.
2487  */
2488 static void raw_to_up_osds(struct ceph_osdmap *osdmap,
2489 			   struct ceph_pg_pool_info *pi,
2490 			   struct ceph_osds *set)
2491 {
2492 	int i;
2493 
2494 	/* ->primary is undefined for a raw set */
2495 	BUG_ON(set->primary != -1);
2496 
2497 	if (ceph_can_shift_osds(pi)) {
2498 		int removed = 0;
2499 
2500 		/* shift left */
2501 		for (i = 0; i < set->size; i++) {
2502 			if (ceph_osd_is_down(osdmap, set->osds[i])) {
2503 				removed++;
2504 				continue;
2505 			}
2506 			if (removed)
2507 				set->osds[i - removed] = set->osds[i];
2508 		}
2509 		set->size -= removed;
2510 		if (set->size > 0)
2511 			set->primary = set->osds[0];
2512 	} else {
2513 		/* set down/dne devices to NONE */
2514 		for (i = set->size - 1; i >= 0; i--) {
2515 			if (ceph_osd_is_down(osdmap, set->osds[i]))
2516 				set->osds[i] = CRUSH_ITEM_NONE;
2517 			else
2518 				set->primary = set->osds[i];
2519 		}
2520 	}
2521 }
2522 
2523 static void apply_primary_affinity(struct ceph_osdmap *osdmap,
2524 				   struct ceph_pg_pool_info *pi,
2525 				   u32 pps,
2526 				   struct ceph_osds *up)
2527 {
2528 	int i;
2529 	int pos = -1;
2530 
2531 	/*
2532 	 * Do we have any non-default primary_affinity values for these
2533 	 * osds?
2534 	 */
2535 	if (!osdmap->osd_primary_affinity)
2536 		return;
2537 
2538 	for (i = 0; i < up->size; i++) {
2539 		int osd = up->osds[i];
2540 
2541 		if (osd != CRUSH_ITEM_NONE &&
2542 		    osdmap->osd_primary_affinity[osd] !=
2543 					CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
2544 			break;
2545 		}
2546 	}
2547 	if (i == up->size)
2548 		return;
2549 
2550 	/*
2551 	 * Pick the primary.  Feed both the seed (for the pg) and the
2552 	 * osd into the hash/rng so that a proportional fraction of an
2553 	 * osd's pgs get rejected as primary.
2554 	 */
2555 	for (i = 0; i < up->size; i++) {
2556 		int osd = up->osds[i];
2557 		u32 aff;
2558 
2559 		if (osd == CRUSH_ITEM_NONE)
2560 			continue;
2561 
2562 		aff = osdmap->osd_primary_affinity[osd];
2563 		if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY &&
2564 		    (crush_hash32_2(CRUSH_HASH_RJENKINS1,
2565 				    pps, osd) >> 16) >= aff) {
2566 			/*
2567 			 * We chose not to use this primary.  Note it
2568 			 * anyway as a fallback in case we don't pick
2569 			 * anyone else, but keep looking.
2570 			 */
2571 			if (pos < 0)
2572 				pos = i;
2573 		} else {
2574 			pos = i;
2575 			break;
2576 		}
2577 	}
2578 	if (pos < 0)
2579 		return;
2580 
2581 	up->primary = up->osds[pos];
2582 
2583 	if (ceph_can_shift_osds(pi) && pos > 0) {
2584 		/* move the new primary to the front */
2585 		for (i = pos; i > 0; i--)
2586 			up->osds[i] = up->osds[i - 1];
2587 		up->osds[0] = up->primary;
2588 	}
2589 }
2590 
2591 /*
2592  * Get pg_temp and primary_temp mappings for given PG.
2593  *
2594  * Note that a PG may have none, only pg_temp, only primary_temp or
2595  * both pg_temp and primary_temp mappings.  This means @temp isn't
2596  * always a valid OSD set on return: in the "only primary_temp" case,
2597  * @temp will have its ->primary >= 0 but ->size == 0.
2598  */
2599 static void get_temp_osds(struct ceph_osdmap *osdmap,
2600 			  struct ceph_pg_pool_info *pi,
2601 			  const struct ceph_pg *pgid,
2602 			  struct ceph_osds *temp)
2603 {
2604 	struct ceph_pg_mapping *pg;
2605 	int i;
2606 
2607 	ceph_osds_init(temp);
2608 
2609 	/* pg_temp? */
2610 	pg = lookup_pg_mapping(&osdmap->pg_temp, pgid);
2611 	if (pg) {
2612 		for (i = 0; i < pg->pg_temp.len; i++) {
2613 			if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
2614 				if (ceph_can_shift_osds(pi))
2615 					continue;
2616 
2617 				temp->osds[temp->size++] = CRUSH_ITEM_NONE;
2618 			} else {
2619 				temp->osds[temp->size++] = pg->pg_temp.osds[i];
2620 			}
2621 		}
2622 
2623 		/* apply pg_temp's primary */
2624 		for (i = 0; i < temp->size; i++) {
2625 			if (temp->osds[i] != CRUSH_ITEM_NONE) {
2626 				temp->primary = temp->osds[i];
2627 				break;
2628 			}
2629 		}
2630 	}
2631 
2632 	/* primary_temp? */
2633 	pg = lookup_pg_mapping(&osdmap->primary_temp, pgid);
2634 	if (pg)
2635 		temp->primary = pg->primary_temp.osd;
2636 }
2637 
2638 /*
2639  * Map a PG to its acting set as well as its up set.
2640  *
2641  * Acting set is used for data mapping purposes, while up set can be
2642  * recorded for detecting interval changes and deciding whether to
2643  * resend a request.
2644  */
2645 void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
2646 			       struct ceph_pg_pool_info *pi,
2647 			       const struct ceph_pg *raw_pgid,
2648 			       struct ceph_osds *up,
2649 			       struct ceph_osds *acting)
2650 {
2651 	struct ceph_pg pgid;
2652 	u32 pps;
2653 
2654 	WARN_ON(pi->id != raw_pgid->pool);
2655 	raw_pg_to_pg(pi, raw_pgid, &pgid);
2656 
2657 	pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps);
2658 	apply_upmap(osdmap, &pgid, up);
2659 	raw_to_up_osds(osdmap, pi, up);
2660 	apply_primary_affinity(osdmap, pi, pps, up);
2661 	get_temp_osds(osdmap, pi, &pgid, acting);
2662 	if (!acting->size) {
2663 		memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0]));
2664 		acting->size = up->size;
2665 		if (acting->primary == -1)
2666 			acting->primary = up->primary;
2667 	}
2668 	WARN_ON(!osds_valid(up) || !osds_valid(acting));
2669 }
2670 
2671 bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap,
2672 			      struct ceph_pg_pool_info *pi,
2673 			      const struct ceph_pg *raw_pgid,
2674 			      struct ceph_spg *spgid)
2675 {
2676 	struct ceph_pg pgid;
2677 	struct ceph_osds up, acting;
2678 	int i;
2679 
2680 	WARN_ON(pi->id != raw_pgid->pool);
2681 	raw_pg_to_pg(pi, raw_pgid, &pgid);
2682 
2683 	if (ceph_can_shift_osds(pi)) {
2684 		spgid->pgid = pgid; /* struct */
2685 		spgid->shard = CEPH_SPG_NOSHARD;
2686 		return true;
2687 	}
2688 
2689 	ceph_pg_to_up_acting_osds(osdmap, pi, &pgid, &up, &acting);
2690 	for (i = 0; i < acting.size; i++) {
2691 		if (acting.osds[i] == acting.primary) {
2692 			spgid->pgid = pgid; /* struct */
2693 			spgid->shard = i;
2694 			return true;
2695 		}
2696 	}
2697 
2698 	return false;
2699 }
2700 
2701 /*
2702  * Return acting primary for given PG, or -1 if none.
2703  */
2704 int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
2705 			      const struct ceph_pg *raw_pgid)
2706 {
2707 	struct ceph_pg_pool_info *pi;
2708 	struct ceph_osds up, acting;
2709 
2710 	pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
2711 	if (!pi)
2712 		return -1;
2713 
2714 	ceph_pg_to_up_acting_osds(osdmap, pi, raw_pgid, &up, &acting);
2715 	return acting.primary;
2716 }
2717 EXPORT_SYMBOL(ceph_pg_to_acting_primary);
2718 
2719 static struct crush_loc_node *alloc_crush_loc(size_t type_name_len,
2720 					      size_t name_len)
2721 {
2722 	struct crush_loc_node *loc;
2723 
2724 	loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO);
2725 	if (!loc)
2726 		return NULL;
2727 
2728 	RB_CLEAR_NODE(&loc->cl_node);
2729 	return loc;
2730 }
2731 
2732 static void free_crush_loc(struct crush_loc_node *loc)
2733 {
2734 	WARN_ON(!RB_EMPTY_NODE(&loc->cl_node));
2735 
2736 	kfree(loc);
2737 }
2738 
2739 static int crush_loc_compare(const struct crush_loc *loc1,
2740 			     const struct crush_loc *loc2)
2741 {
2742 	return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?:
2743 	       strcmp(loc1->cl_name, loc2->cl_name);
2744 }
2745 
2746 DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare,
2747 		 RB_BYPTR, const struct crush_loc *, cl_node)
2748 
2749 /*
2750  * Parses a set of <bucket type name>':'<bucket name> pairs separated
2751  * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar".
2752  *
2753  * Note that @crush_location is modified by strsep().
2754  */
2755 int ceph_parse_crush_location(char *crush_location, struct rb_root *locs)
2756 {
2757 	struct crush_loc_node *loc;
2758 	const char *type_name, *name, *colon;
2759 	size_t type_name_len, name_len;
2760 
2761 	dout("%s '%s'\n", __func__, crush_location);
2762 	while ((type_name = strsep(&crush_location, "|"))) {
2763 		colon = strchr(type_name, ':');
2764 		if (!colon)
2765 			return -EINVAL;
2766 
2767 		type_name_len = colon - type_name;
2768 		if (type_name_len == 0)
2769 			return -EINVAL;
2770 
2771 		name = colon + 1;
2772 		name_len = strlen(name);
2773 		if (name_len == 0)
2774 			return -EINVAL;
2775 
2776 		loc = alloc_crush_loc(type_name_len, name_len);
2777 		if (!loc)
2778 			return -ENOMEM;
2779 
2780 		loc->cl_loc.cl_type_name = loc->cl_data;
2781 		memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len);
2782 		loc->cl_loc.cl_type_name[type_name_len] = '\0';
2783 
2784 		loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1;
2785 		memcpy(loc->cl_loc.cl_name, name, name_len);
2786 		loc->cl_loc.cl_name[name_len] = '\0';
2787 
2788 		if (!__insert_crush_loc(locs, loc)) {
2789 			free_crush_loc(loc);
2790 			return -EEXIST;
2791 		}
2792 
2793 		dout("%s type_name '%s' name '%s'\n", __func__,
2794 		     loc->cl_loc.cl_type_name, loc->cl_loc.cl_name);
2795 	}
2796 
2797 	return 0;
2798 }
2799 
2800 int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2)
2801 {
2802 	struct rb_node *n1 = rb_first(locs1);
2803 	struct rb_node *n2 = rb_first(locs2);
2804 	int ret;
2805 
2806 	for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) {
2807 		struct crush_loc_node *loc1 =
2808 		    rb_entry(n1, struct crush_loc_node, cl_node);
2809 		struct crush_loc_node *loc2 =
2810 		    rb_entry(n2, struct crush_loc_node, cl_node);
2811 
2812 		ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc);
2813 		if (ret)
2814 			return ret;
2815 	}
2816 
2817 	if (!n1 && n2)
2818 		return -1;
2819 	if (n1 && !n2)
2820 		return 1;
2821 	return 0;
2822 }
2823 
2824 void ceph_clear_crush_locs(struct rb_root *locs)
2825 {
2826 	while (!RB_EMPTY_ROOT(locs)) {
2827 		struct crush_loc_node *loc =
2828 		    rb_entry(rb_first(locs), struct crush_loc_node, cl_node);
2829 
2830 		erase_crush_loc(locs, loc);
2831 		free_crush_loc(loc);
2832 	}
2833 }
2834 
2835 /*
2836  * [a-zA-Z0-9-_.]+
2837  */
2838 static bool is_valid_crush_name(const char *name)
2839 {
2840 	do {
2841 		if (!('a' <= *name && *name <= 'z') &&
2842 		    !('A' <= *name && *name <= 'Z') &&
2843 		    !('0' <= *name && *name <= '9') &&
2844 		    *name != '-' && *name != '_' && *name != '.')
2845 			return false;
2846 	} while (*++name != '\0');
2847 
2848 	return true;
2849 }
2850 
2851 /*
2852  * Gets the parent of an item.  Returns its id (<0 because the
2853  * parent is always a bucket), type id (>0 for the same reason,
2854  * via @parent_type_id) and location (via @parent_loc).  If no
2855  * parent, returns 0.
2856  *
2857  * Does a linear search, as there are no parent pointers of any
2858  * kind.  Note that the result is ambigous for items that occur
2859  * multiple times in the map.
2860  */
2861 static int get_immediate_parent(struct crush_map *c, int id,
2862 				u16 *parent_type_id,
2863 				struct crush_loc *parent_loc)
2864 {
2865 	struct crush_bucket *b;
2866 	struct crush_name_node *type_cn, *cn;
2867 	int i, j;
2868 
2869 	for (i = 0; i < c->max_buckets; i++) {
2870 		b = c->buckets[i];
2871 		if (!b)
2872 			continue;
2873 
2874 		/* ignore per-class shadow hierarchy */
2875 		cn = lookup_crush_name(&c->names, b->id);
2876 		if (!cn || !is_valid_crush_name(cn->cn_name))
2877 			continue;
2878 
2879 		for (j = 0; j < b->size; j++) {
2880 			if (b->items[j] != id)
2881 				continue;
2882 
2883 			*parent_type_id = b->type;
2884 			type_cn = lookup_crush_name(&c->type_names, b->type);
2885 			parent_loc->cl_type_name = type_cn->cn_name;
2886 			parent_loc->cl_name = cn->cn_name;
2887 			return b->id;
2888 		}
2889 	}
2890 
2891 	return 0;  /* no parent */
2892 }
2893 
2894 /*
2895  * Calculates the locality/distance from an item to a client
2896  * location expressed in terms of CRUSH hierarchy as a set of
2897  * (bucket type name, bucket name) pairs.  Specifically, looks
2898  * for the lowest-valued bucket type for which the location of
2899  * @id matches one of the locations in @locs, so for standard
2900  * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9)
2901  * a matching host is closer than a matching rack and a matching
2902  * data center is closer than a matching zone.
2903  *
2904  * Specifying multiple locations (a "multipath" location) such
2905  * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs
2906  * is a multimap.  The locality will be:
2907  *
2908  * - 3 for OSDs in racks foo1 and foo2
2909  * - 8 for OSDs in data center bar
2910  * - -1 for all other OSDs
2911  *
2912  * The lowest possible bucket type is 1, so the best locality
2913  * for an OSD is 1 (i.e. a matching host).  Locality 0 would be
2914  * the OSD itself.
2915  */
2916 int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id,
2917 			    struct rb_root *locs)
2918 {
2919 	struct crush_loc loc;
2920 	u16 type_id;
2921 
2922 	/*
2923 	 * Instead of repeated get_immediate_parent() calls,
2924 	 * the location of @id could be obtained with a single
2925 	 * depth-first traversal.
2926 	 */
2927 	for (;;) {
2928 		id = get_immediate_parent(osdmap->crush, id, &type_id, &loc);
2929 		if (id >= 0)
2930 			return -1;  /* not local */
2931 
2932 		if (lookup_crush_loc(locs, &loc))
2933 			return type_id;
2934 	}
2935 }
2936