xref: /linux/net/sunrpc/xprtrdma/svc_rdma_pcl.c (revision a460513ed4b6994bfeb7bd86f72853140bc1ac12)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Copyright (c) 2020 Oracle. All rights reserved.
4  */
5 
6 #include <linux/sunrpc/svc_rdma.h>
7 #include <linux/sunrpc/rpc_rdma.h>
8 
9 #include "xprt_rdma.h"
10 #include <trace/events/rpcrdma.h>
11 
12 /**
13  * pcl_free - Release all memory associated with a parsed chunk list
14  * @pcl: parsed chunk list
15  *
16  */
17 void pcl_free(struct svc_rdma_pcl *pcl)
18 {
19 	while (!list_empty(&pcl->cl_chunks)) {
20 		struct svc_rdma_chunk *chunk;
21 
22 		chunk = pcl_first_chunk(pcl);
23 		list_del(&chunk->ch_list);
24 		kfree(chunk);
25 	}
26 }
27 
28 static struct svc_rdma_chunk *pcl_alloc_chunk(u32 segcount, u32 position)
29 {
30 	struct svc_rdma_chunk *chunk;
31 
32 	chunk = kmalloc(struct_size(chunk, ch_segments, segcount), GFP_KERNEL);
33 	if (!chunk)
34 		return NULL;
35 
36 	chunk->ch_position = position;
37 	chunk->ch_length = 0;
38 	chunk->ch_payload_length = 0;
39 	chunk->ch_segcount = 0;
40 	return chunk;
41 }
42 
43 static struct svc_rdma_chunk *
44 pcl_lookup_position(struct svc_rdma_pcl *pcl, u32 position)
45 {
46 	struct svc_rdma_chunk *pos;
47 
48 	pcl_for_each_chunk(pos, pcl) {
49 		if (pos->ch_position == position)
50 			return pos;
51 	}
52 	return NULL;
53 }
54 
55 static void pcl_insert_position(struct svc_rdma_pcl *pcl,
56 				struct svc_rdma_chunk *chunk)
57 {
58 	struct svc_rdma_chunk *pos;
59 
60 	pcl_for_each_chunk(pos, pcl) {
61 		if (pos->ch_position > chunk->ch_position)
62 			break;
63 	}
64 	__list_add(&chunk->ch_list, pos->ch_list.prev, &pos->ch_list);
65 	pcl->cl_count++;
66 }
67 
68 static void pcl_set_read_segment(const struct svc_rdma_recv_ctxt *rctxt,
69 				 struct svc_rdma_chunk *chunk,
70 				 u32 handle, u32 length, u64 offset)
71 {
72 	struct svc_rdma_segment *segment;
73 
74 	segment = &chunk->ch_segments[chunk->ch_segcount];
75 	segment->rs_handle = handle;
76 	segment->rs_length = length;
77 	segment->rs_offset = offset;
78 
79 	trace_svcrdma_decode_rseg(&rctxt->rc_cid, chunk, segment);
80 
81 	chunk->ch_length += length;
82 	chunk->ch_segcount++;
83 }
84 
85 /**
86  * pcl_alloc_call - Construct a parsed chunk list for the Call body
87  * @rctxt: Ingress receive context
88  * @p: Start of an un-decoded Read list
89  *
90  * Assumptions:
91  * - The incoming Read list has already been sanity checked.
92  * - cl_count is already set to the number of segments in
93  *   the un-decoded list.
94  * - The list might not be in order by position.
95  *
96  * Return values:
97  *       %true: Parsed chunk list was successfully constructed, and
98  *              cl_count is updated to be the number of chunks (ie.
99  *              unique positions) in the Read list.
100  *      %false: Memory allocation failed.
101  */
102 bool pcl_alloc_call(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
103 {
104 	struct svc_rdma_pcl *pcl = &rctxt->rc_call_pcl;
105 	unsigned int i, segcount = pcl->cl_count;
106 
107 	pcl->cl_count = 0;
108 	for (i = 0; i < segcount; i++) {
109 		struct svc_rdma_chunk *chunk;
110 		u32 position, handle, length;
111 		u64 offset;
112 
113 		p++;	/* skip the list discriminator */
114 		p = xdr_decode_read_segment(p, &position, &handle,
115 					    &length, &offset);
116 		if (position != 0)
117 			continue;
118 
119 		if (pcl_is_empty(pcl)) {
120 			chunk = pcl_alloc_chunk(segcount, position);
121 			if (!chunk)
122 				return false;
123 			pcl_insert_position(pcl, chunk);
124 		} else {
125 			chunk = list_first_entry(&pcl->cl_chunks,
126 						 struct svc_rdma_chunk,
127 						 ch_list);
128 		}
129 
130 		pcl_set_read_segment(rctxt, chunk, handle, length, offset);
131 	}
132 
133 	return true;
134 }
135 
136 /**
137  * pcl_alloc_read - Construct a parsed chunk list for normal Read chunks
138  * @rctxt: Ingress receive context
139  * @p: Start of an un-decoded Read list
140  *
141  * Assumptions:
142  * - The incoming Read list has already been sanity checked.
143  * - cl_count is already set to the number of segments in
144  *   the un-decoded list.
145  * - The list might not be in order by position.
146  *
147  * Return values:
148  *       %true: Parsed chunk list was successfully constructed, and
149  *              cl_count is updated to be the number of chunks (ie.
150  *              unique position values) in the Read list.
151  *      %false: Memory allocation failed.
152  *
153  * TODO:
154  * - Check for chunk range overlaps
155  */
156 bool pcl_alloc_read(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
157 {
158 	struct svc_rdma_pcl *pcl = &rctxt->rc_read_pcl;
159 	unsigned int i, segcount = pcl->cl_count;
160 
161 	pcl->cl_count = 0;
162 	for (i = 0; i < segcount; i++) {
163 		struct svc_rdma_chunk *chunk;
164 		u32 position, handle, length;
165 		u64 offset;
166 
167 		p++;	/* skip the list discriminator */
168 		p = xdr_decode_read_segment(p, &position, &handle,
169 					    &length, &offset);
170 		if (position == 0)
171 			continue;
172 
173 		chunk = pcl_lookup_position(pcl, position);
174 		if (!chunk) {
175 			chunk = pcl_alloc_chunk(segcount, position);
176 			if (!chunk)
177 				return false;
178 			pcl_insert_position(pcl, chunk);
179 		}
180 
181 		pcl_set_read_segment(rctxt, chunk, handle, length, offset);
182 	}
183 
184 	return true;
185 }
186 
187 /**
188  * pcl_alloc_write - Construct a parsed chunk list from a Write list
189  * @rctxt: Ingress receive context
190  * @pcl: Parsed chunk list to populate
191  * @p: Start of an un-decoded Write list
192  *
193  * Assumptions:
194  * - The incoming Write list has already been sanity checked, and
195  * - cl_count is set to the number of chunks in the un-decoded list.
196  *
197  * Return values:
198  *       %true: Parsed chunk list was successfully constructed.
199  *      %false: Memory allocation failed.
200  */
201 bool pcl_alloc_write(struct svc_rdma_recv_ctxt *rctxt,
202 		     struct svc_rdma_pcl *pcl, __be32 *p)
203 {
204 	struct svc_rdma_segment *segment;
205 	struct svc_rdma_chunk *chunk;
206 	unsigned int i, j;
207 	u32 segcount;
208 
209 	for (i = 0; i < pcl->cl_count; i++) {
210 		p++;	/* skip the list discriminator */
211 		segcount = be32_to_cpup(p++);
212 
213 		chunk = pcl_alloc_chunk(segcount, 0);
214 		if (!chunk)
215 			return false;
216 		list_add_tail(&chunk->ch_list, &pcl->cl_chunks);
217 
218 		for (j = 0; j < segcount; j++) {
219 			segment = &chunk->ch_segments[j];
220 			p = xdr_decode_rdma_segment(p, &segment->rs_handle,
221 						    &segment->rs_length,
222 						    &segment->rs_offset);
223 			trace_svcrdma_decode_wseg(&rctxt->rc_cid, chunk, j);
224 
225 			chunk->ch_length += segment->rs_length;
226 			chunk->ch_segcount++;
227 		}
228 	}
229 	return true;
230 }
231 
232 static int pcl_process_region(const struct xdr_buf *xdr,
233 			      unsigned int offset, unsigned int length,
234 			      int (*actor)(const struct xdr_buf *, void *),
235 			      void *data)
236 {
237 	struct xdr_buf subbuf;
238 
239 	if (!length)
240 		return 0;
241 	if (xdr_buf_subsegment(xdr, &subbuf, offset, length))
242 		return -EMSGSIZE;
243 	return actor(&subbuf, data);
244 }
245 
246 /**
247  * pcl_process_nonpayloads - Process non-payload regions inside @xdr
248  * @pcl: Chunk list to process
249  * @xdr: xdr_buf to process
250  * @actor: Function to invoke on each non-payload region
251  * @data: Arguments for @actor
252  *
253  * This mechanism must ignore not only result payloads that were already
254  * sent via RDMA Write, but also XDR padding for those payloads that
255  * the upper layer has added.
256  *
257  * Assumptions:
258  *  The xdr->len and ch_position fields are aligned to 4-byte multiples.
259  *
260  * Returns:
261  *   On success, zero,
262  *   %-EMSGSIZE on XDR buffer overflow, or
263  *   The return value of @actor
264  */
265 int pcl_process_nonpayloads(const struct svc_rdma_pcl *pcl,
266 			    const struct xdr_buf *xdr,
267 			    int (*actor)(const struct xdr_buf *, void *),
268 			    void *data)
269 {
270 	struct svc_rdma_chunk *chunk, *next;
271 	unsigned int start;
272 	int ret;
273 
274 	chunk = pcl_first_chunk(pcl);
275 
276 	/* No result payloads were generated */
277 	if (!chunk || !chunk->ch_payload_length)
278 		return actor(xdr, data);
279 
280 	/* Process the region before the first result payload */
281 	ret = pcl_process_region(xdr, 0, chunk->ch_position, actor, data);
282 	if (ret < 0)
283 		return ret;
284 
285 	/* Process the regions between each middle result payload */
286 	while ((next = pcl_next_chunk(pcl, chunk))) {
287 		if (!next->ch_payload_length)
288 			break;
289 
290 		start = pcl_chunk_end_offset(chunk);
291 		ret = pcl_process_region(xdr, start, next->ch_position - start,
292 					 actor, data);
293 		if (ret < 0)
294 			return ret;
295 
296 		chunk = next;
297 	}
298 
299 	/* Process the region after the last result payload */
300 	start = pcl_chunk_end_offset(chunk);
301 	ret = pcl_process_region(xdr, start, xdr->len - start, actor, data);
302 	if (ret < 0)
303 		return ret;
304 
305 	return 0;
306 }
307