xref: /illumos-gate/usr/src/cmd/sort/streams.c (revision 101e15b5f8a77d9433805e541996abaabc9ca8c1)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include "streams.h"
27 
28 static const stream_ops_t invalid_ops = {
29 	NULL,
30 	NULL,
31 	NULL,
32 	NULL,
33 	NULL,
34 	NULL,
35 	NULL,
36 	NULL,
37 	NULL,
38 	NULL
39 };
40 
41 stream_t *
stream_new(int src)42 stream_new(int src)
43 {
44 	stream_t *str = safe_realloc(NULL, sizeof (stream_t));
45 
46 	stream_clear(str);
47 	stream_set(str, src);
48 
49 	return (str);
50 }
51 
52 void
stream_set(stream_t * str,flag_t flags)53 stream_set(stream_t *str, flag_t flags)
54 {
55 	if (flags & STREAM_SOURCE_MASK) {
56 		ASSERT((flags & STREAM_SOURCE_MASK) == STREAM_ARRAY ||
57 		    (flags & STREAM_SOURCE_MASK) == STREAM_SINGLE ||
58 		    (flags & STREAM_SOURCE_MASK) == STREAM_MMAP ||
59 		    (flags & STREAM_SOURCE_MASK) == STREAM_WIDE);
60 
61 		str->s_status &= ~STREAM_SOURCE_MASK;
62 		str->s_status |= flags & STREAM_SOURCE_MASK;
63 
64 		switch (flags & STREAM_SOURCE_MASK) {
65 			case STREAM_NO_SOURCE:
66 				str->s_element_size = 0;
67 				str->s_ops = invalid_ops;
68 				return;
69 			case STREAM_ARRAY:
70 				/*
71 				 * Array streams inherit element size.
72 				 */
73 				str->s_ops = stream_array_ops;
74 				break;
75 			case STREAM_MMAP:
76 				str->s_element_size = sizeof (char);
77 				str->s_ops = stream_mmap_ops;
78 				break;
79 			case STREAM_SINGLE:
80 				str->s_element_size = sizeof (char);
81 				str->s_ops = stream_stdio_ops;
82 				break;
83 			case STREAM_WIDE:
84 				str->s_element_size = sizeof (wchar_t);
85 				str->s_ops = stream_wide_ops;
86 				break;
87 			default:
88 				die(EMSG_UNKN_STREAM, str->s_status);
89 		}
90 	}
91 
92 	str->s_status |= (flags & ~STREAM_SOURCE_MASK);
93 
94 	if (str->s_status & STREAM_UNIQUE)
95 		switch (str->s_status & STREAM_SOURCE_MASK) {
96 			case STREAM_SINGLE :
97 				str->s_ops.sop_put_line =
98 				    stream_stdio_put_line_unique;
99 				break;
100 			case STREAM_WIDE :
101 				str->s_ops.sop_put_line =
102 				    stream_wide_put_line_unique;
103 				break;
104 			default :
105 				break;
106 		}
107 
108 	if (str->s_status & STREAM_INSTANT)
109 		switch (str->s_status & STREAM_SOURCE_MASK) {
110 			case STREAM_SINGLE :
111 				str->s_ops.sop_fetch =
112 				    stream_stdio_fetch_overwrite;
113 				break;
114 			case STREAM_WIDE :
115 				str->s_ops.sop_fetch =
116 				    stream_wide_fetch_overwrite;
117 				break;
118 			default :
119 				break;
120 		}
121 }
122 
123 void
stream_unset(stream_t * streamp,flag_t flags)124 stream_unset(stream_t *streamp, flag_t flags)
125 {
126 	ASSERT(!(flags & STREAM_SOURCE_MASK));
127 
128 	streamp->s_status &= ~(flags & ~STREAM_SOURCE_MASK);
129 }
130 
131 int
stream_is_primed(stream_t * streamp)132 stream_is_primed(stream_t *streamp)
133 {
134 	return (streamp->s_status & STREAM_PRIMED);
135 }
136 
137 void
stream_clear(stream_t * str)138 stream_clear(stream_t *str)
139 {
140 	(void) memset(str, 0, sizeof (stream_t));
141 }
142 
143 static void
stream_copy(stream_t * dest,stream_t * src)144 stream_copy(stream_t *dest, stream_t *src)
145 {
146 	(void) memcpy(dest, src, sizeof (stream_t));
147 }
148 
149 void
stream_stat_chain(stream_t * strp)150 stream_stat_chain(stream_t *strp)
151 {
152 	struct stat buf;
153 	stream_t *cur_strp = strp;
154 
155 	while (cur_strp != NULL) {
156 		if (cur_strp->s_status & STREAM_NOTFILE ||
157 		    cur_strp->s_status & STREAM_ARRAY) {
158 			cur_strp = cur_strp->s_next;
159 			continue;
160 		}
161 
162 		if (stat(cur_strp->s_filename, &buf) < 0)
163 			die(EMSG_STAT, cur_strp->s_filename);
164 
165 		cur_strp->s_dev = buf.st_dev;
166 		cur_strp->s_ino = buf.st_ino;
167 		cur_strp->s_filesize = buf.st_size;
168 
169 		cur_strp = cur_strp->s_next;
170 	}
171 }
172 
173 uint_t
stream_count_chain(stream_t * str)174 stream_count_chain(stream_t *str)
175 {
176 	uint_t n = 0;
177 
178 	while (str != NULL) {
179 		n++;
180 		str = str->s_next;
181 	}
182 
183 	return (n);
184 }
185 
186 int
stream_open_for_read(sort_t * S,stream_t * str)187 stream_open_for_read(sort_t *S, stream_t *str)
188 {
189 	int fd;
190 
191 	ASSERT(!(str->s_status & STREAM_OUTPUT));
192 
193 	/*
194 	 * STREAM_ARRAY streams are open by definition.
195 	 */
196 	if ((str->s_status & STREAM_SOURCE_MASK) == STREAM_ARRAY) {
197 		stream_set(str, STREAM_ARRAY | STREAM_OPEN);
198 		return (1);
199 	}
200 
201 	/*
202 	 * Set data type according to locale for input from stdin.
203 	 */
204 	if (str->s_status & STREAM_NOTFILE) {
205 		str->s_type.BF.s_fp = stdin;
206 		stream_set(str, STREAM_OPEN | (S->m_single_byte_locale ?
207 		    STREAM_SINGLE : STREAM_WIDE));
208 		return (1);
209 	}
210 
211 	ASSERT(str->s_filename);
212 
213 #ifndef DEBUG_DISALLOW_MMAP
214 	if (S->m_single_byte_locale &&
215 	    str->s_filesize > 0 &&
216 	    str->s_filesize < SSIZE_MAX) {
217 		/*
218 		 * make mmap() attempt; set s_status and return if successful
219 		 */
220 		fd = open(str->s_filename, O_RDONLY);
221 		if (fd < 0) {
222 			if (errno == EMFILE || errno == ENFILE)
223 				return (-1);
224 			else
225 				die(EMSG_OPEN, str->s_filename);
226 		}
227 		str->s_buffer = mmap(0, str->s_filesize, PROT_READ,
228 		    MAP_SHARED, fd, 0);
229 
230 		if (str->s_buffer != MAP_FAILED) {
231 			str->s_buffer_size = str->s_filesize;
232 			str->s_type.SF.s_fd = fd;
233 
234 			stream_set(str, STREAM_MMAP | STREAM_OPEN);
235 			stream_unset(str, STREAM_PRIMED);
236 			return (1);
237 		}
238 
239 		/*
240 		 * Otherwise the mmap() failed due to address space exhaustion;
241 		 * since we have already opened the file, we close it and drop
242 		 * into the normal (STDIO) case.
243 		 */
244 		(void) close(fd);
245 		str->s_buffer = NULL;
246 	}
247 #endif /* DEBUG_DISALLOW_MMAP */
248 
249 	if ((str->s_type.BF.s_fp = fopen(str->s_filename, "r")) == NULL) {
250 		if (errno == EMFILE || errno == ENFILE)
251 			return (-1);
252 		else
253 			die(EMSG_OPEN, str->s_filename);
254 	}
255 
256 	str->s_type.BF.s_vbuf = safe_realloc(NULL, STDIO_VBUF_SIZE);
257 	if (setvbuf(str->s_type.BF.s_fp, str->s_type.BF.s_vbuf, _IOFBF,
258 	    STDIO_VBUF_SIZE) != 0) {
259 		safe_free(str->s_type.BF.s_vbuf);
260 		str->s_type.BF.s_vbuf = NULL;
261 	}
262 
263 	stream_set(str, STREAM_OPEN | (S->m_single_byte_locale ? STREAM_SINGLE :
264 	    STREAM_WIDE));
265 	stream_unset(str, STREAM_PRIMED);
266 
267 	return (1);
268 }
269 
270 void
stream_set_size(stream_t * str,size_t new_size)271 stream_set_size(stream_t *str, size_t new_size)
272 {
273 	/*
274 	 * p_new_size is new_size rounded upwards to nearest multiple of
275 	 * PAGESIZE, since mmap() is going to reserve it in any case.  This
276 	 * ensures that the far end of the buffer is also aligned, such that we
277 	 * obtain aligned pointers if we choose to subtract from it.
278 	 */
279 	size_t p_new_size = (new_size + PAGESIZE) & ~(PAGESIZE - 1);
280 
281 	if (str->s_buffer_size == p_new_size)
282 		return;
283 
284 	if (str->s_buffer != NULL)
285 		(void) munmap(str->s_buffer, str->s_buffer_size);
286 
287 	if (new_size == 0) {
288 		str->s_buffer = NULL;
289 		str->s_buffer_size = 0;
290 		return;
291 	}
292 
293 	str->s_buffer = xzmap(0, p_new_size, PROT_READ | PROT_WRITE,
294 	    MAP_PRIVATE, 0);
295 
296 	if (str->s_buffer == MAP_FAILED)
297 		die(EMSG_MMAP);
298 
299 	str->s_buffer_size = p_new_size;
300 }
301 
302 void
stream_add_file_to_chain(stream_t ** str_chain,char * filename)303 stream_add_file_to_chain(stream_t **str_chain, char *filename)
304 {
305 	stream_t *str;
306 
307 	str = stream_new(STREAM_NO_SOURCE);
308 
309 	str->s_filename = filename;
310 	str->s_type.SF.s_fd = -1;
311 
312 	stream_push_to_chain(str_chain, str);
313 }
314 
315 void
stream_push_to_chain(stream_t ** str_chain,stream_t * streamp)316 stream_push_to_chain(stream_t **str_chain, stream_t *streamp)
317 {
318 	stream_t *cur_streamp = *str_chain;
319 
320 	if (cur_streamp == NULL) {
321 		*str_chain = streamp;
322 		streamp->s_next = NULL;
323 		return;
324 	}
325 
326 	while (cur_streamp->s_next != NULL)
327 		cur_streamp = cur_streamp->s_next;
328 
329 	cur_streamp->s_next = streamp;
330 	streamp->s_previous = cur_streamp;
331 	streamp->s_next = NULL;
332 }
333 
334 static void
stream_dump(stream_t * str_in,stream_t * str_out)335 stream_dump(stream_t *str_in, stream_t *str_out)
336 {
337 	ASSERT(!(str_in->s_status & STREAM_OUTPUT));
338 	ASSERT(str_out->s_status & STREAM_OUTPUT);
339 
340 	SOP_PUT_LINE(str_out, &str_in->s_current);
341 
342 	while (!SOP_EOS(str_in)) {
343 		SOP_FETCH(str_in);
344 		SOP_PUT_LINE(str_out, &str_in->s_current);
345 	}
346 }
347 
348 /*
349  * stream_push_to_temporary() with flags set to ST_CACHE merely copies the
350  * stream_t pointer onto the chain.  With flags set to ST_NOCACHE, the stream is
351  * written out to a file.  Stream pointers passed to stream_push_to_temporary()
352  * must refer to allocated objects, and not to objects created on function
353  * stacks.  Finally, if strp == NULL, stream_push_to_temporary() creates and
354  * pushes the new stream; the output stream is left open if ST_OPEN is set.
355  */
356 stream_t *
stream_push_to_temporary(stream_t ** str_chain,stream_t * streamp,int flags)357 stream_push_to_temporary(stream_t **str_chain, stream_t *streamp, int flags)
358 {
359 	stream_t *out_streamp;
360 
361 	if (flags & ST_CACHE) {
362 		ASSERT(streamp->s_status & STREAM_ARRAY);
363 		stream_set(streamp, STREAM_NOT_FREEABLE | STREAM_TEMPORARY);
364 		stream_push_to_chain(str_chain, streamp);
365 		return (streamp);
366 	}
367 
368 	out_streamp = safe_realloc(NULL, sizeof (stream_t));
369 
370 	if (streamp != NULL) {
371 		stream_copy(out_streamp, streamp);
372 		stream_unset(out_streamp, STREAM_OPEN);
373 		ASSERT(streamp->s_element_size == sizeof (char) ||
374 		    streamp->s_element_size == sizeof (wchar_t));
375 		stream_set(out_streamp,
376 		    streamp->s_element_size == 1 ? STREAM_SINGLE : STREAM_WIDE);
377 		out_streamp->s_buffer = NULL;
378 		out_streamp->s_buffer_size = 0;
379 	} else {
380 		stream_clear(out_streamp);
381 		stream_set(out_streamp, flags & ST_WIDE ? STREAM_WIDE :
382 		    STREAM_SINGLE);
383 	}
384 
385 	(void) bump_file_template();
386 	out_streamp->s_filename = strdup(get_file_template());
387 
388 	if (SOP_OPEN_FOR_WRITE(out_streamp) == -1)
389 		return (NULL);
390 
391 	stream_set(out_streamp, STREAM_TEMPORARY);
392 	stream_push_to_chain(str_chain, out_streamp);
393 
394 	if (streamp != NULL) {
395 		/*
396 		 * We reset the input stream to the beginning, and copy it in
397 		 * sequence to the output stream, freeing the raw_collate field
398 		 * as we go.
399 		 */
400 		if (SOP_PRIME(streamp) != PRIME_SUCCEEDED)
401 			die(EMSG_BADPRIME);
402 		stream_dump(streamp, out_streamp);
403 	}
404 
405 	if (!(flags & ST_OPEN)) {
406 		SOP_FREE(out_streamp);
407 		(void) SOP_CLOSE(out_streamp);
408 	}
409 
410 	/*
411 	 * Now that we've written this stream to disk, we needn't protect any
412 	 * in-memory consumer.
413 	 */
414 	if (streamp != NULL)
415 		streamp->s_consumer = NULL;
416 
417 	return (out_streamp);
418 }
419 
420 void
stream_close_all_previous(stream_t * tail_streamp)421 stream_close_all_previous(stream_t *tail_streamp)
422 {
423 	stream_t *cur_streamp;
424 
425 	ASSERT(tail_streamp != NULL);
426 
427 	cur_streamp = tail_streamp->s_previous;
428 	while (cur_streamp != NULL) {
429 		(void) SOP_FREE(cur_streamp);
430 		if (SOP_IS_CLOSABLE(cur_streamp))
431 			(void) SOP_CLOSE(cur_streamp);
432 
433 		cur_streamp = cur_streamp->s_previous;
434 	}
435 }
436 
437 void
stream_unlink_temporary(stream_t * streamp)438 stream_unlink_temporary(stream_t *streamp)
439 {
440 	if (streamp->s_status & STREAM_TEMPORARY) {
441 		(void) SOP_FREE(streamp);
442 
443 		if (streamp->s_ops.sop_unlink)
444 			(void) SOP_UNLINK(streamp);
445 	}
446 }
447 
448 /*
449  * stream_insert() takes input from src stream, converts to each line to
450  * collatable form, and places a line_rec_t in dest stream, which is of type
451  * STREAM_ARRAY.
452  */
453 int
stream_insert(sort_t * S,stream_t * src,stream_t * dest)454 stream_insert(sort_t *S, stream_t *src, stream_t *dest)
455 {
456 	ssize_t i = dest->s_type.LA.s_array_size;
457 	line_rec_t *l_series;
458 	char *l_convert = dest->s_buffer;
459 	int return_val = ST_MEM_AVAIL;
460 	int fetch_result = NEXT_LINE_COMPLETE;
461 
462 	/*
463 	 * Scan through until total bytes allowed accumulated, and return.
464 	 * Use SOP_FETCH(src) so that this works for all stream types,
465 	 * and so that we can repeat until eos.
466 	 *
467 	 * For each new line, we move back sizeof (line_rec_t) from the end of
468 	 * the array buffer, and copy into the start of the array buffer.  When
469 	 * the pointers meet, or when we exhaust the current stream, we return.
470 	 * If we have not filled the current memory allocation, we return
471 	 * ST_MEM_AVAIL, else we return ST_MEM_FILLED.
472 	 */
473 	ASSERT(stream_is_primed(src));
474 	ASSERT(dest->s_status & STREAM_ARRAY);
475 
476 	/*LINTED ALIGNMENT*/
477 	l_series = (line_rec_t *)((caddr_t)dest->s_buffer
478 	    + dest->s_buffer_size) - dest->s_type.LA.s_array_size;
479 
480 	if (dest->s_type.LA.s_array_size)
481 		l_convert = l_series->l_collate.sp +
482 		    l_series->l_collate_length + src->s_element_size;
483 
484 	/*
485 	 * current line has been set prior to entry
486 	 */
487 	src->s_current.l_collate.sp = l_convert;
488 	src->s_current.l_collate_bufsize = (caddr_t)l_series
489 	    - (caddr_t)l_convert - sizeof (line_rec_t);
490 	src->s_current.l_raw_collate.sp = NULL;
491 
492 	if (src->s_current.l_collate_bufsize <= 0)
493 		return (ST_MEM_FILLED);
494 
495 	src->s_consumer = dest;
496 
497 	while (src->s_current.l_collate_bufsize > 0 &&
498 	    (src->s_current.l_collate_length = S->m_coll_convert(
499 	    S->m_fields_head, &src->s_current, FCV_FAIL,
500 	    S->m_field_separator)) >= 0) {
501 		ASSERT((char *)l_series > l_convert);
502 		l_series--;
503 		l_convert += src->s_current.l_collate_length;
504 
505 		if ((char *)l_series <= l_convert) {
506 			__S(stats_incr_insert_filled_downward());
507 			l_series++;
508 			return_val = ST_MEM_FILLED;
509 			break;
510 		}
511 
512 		/*
513 		 * There's no collision with the lower part of the buffer, so we
514 		 * can safely begin processing the line.  In the debug case, we
515 		 * test for uninitialized data by copying a non-zero pattern.
516 		 */
517 #ifdef DEBUG
518 		memset(l_series, 0x1ff11ff1, sizeof (line_rec_t));
519 #endif
520 
521 		copy_line_rec(&src->s_current, l_series);
522 		i++;
523 
524 		if (SOP_EOS(src) ||
525 		    (fetch_result = SOP_FETCH(src)) == NEXT_LINE_INCOMPLETE)
526 			break;
527 
528 		src->s_current.l_collate.sp = l_convert;
529 		src->s_current.l_collate_bufsize = (caddr_t)l_series
530 		    - (caddr_t)l_convert - sizeof (line_rec_t);
531 		src->s_current.l_raw_collate.sp = NULL;
532 	}
533 
534 	if (fetch_result == NEXT_LINE_INCOMPLETE) {
535 		__S(stats_incr_insert_filled_input());
536 		return_val = ST_MEM_FILLED;
537 	} else if (src->s_current.l_collate_length < 0 ||
538 	    src->s_current.l_collate_bufsize <= 0) {
539 		__S(stats_incr_insert_filled_upward());
540 		return_val = ST_MEM_FILLED;
541 	}
542 
543 	if (fetch_result != NEXT_LINE_INCOMPLETE &&
544 	    src->s_current.l_collate_length < 0 &&
545 	    i == 0)
546 		/*
547 		 * There's no room for conversion of our only line; need to
548 		 * execute with larger memory.
549 		 */
550 		die(EMSG_MEMORY);
551 
552 	/*
553 	 * Set up pointer array to line records.
554 	 */
555 	if (i > dest->s_type.LA.s_array_size)
556 		dest->s_type.LA.s_array = safe_realloc(dest->s_type.LA.s_array,
557 		    sizeof (line_rec_t *) * i);
558 	dest->s_type.LA.s_array_size = i;
559 
560 	i = 0;
561 	while (i < dest->s_type.LA.s_array_size) {
562 		dest->s_type.LA.s_array[i] = l_series;
563 		l_series++;
564 		i++;
565 	}
566 
567 	/*
568 	 * LINES_ARRAY streams are always open.
569 	 */
570 	stream_set(dest, STREAM_OPEN);
571 
572 	return (return_val);
573 }
574 
575 /*
576  * stream_swap_buffer() exchanges the stream's buffer with the proffered one;
577  * s_current is not adjusted so this is safe only for STREAM_INSTANT.
578  */
579 void
stream_swap_buffer(stream_t * str,char ** buf,size_t * size)580 stream_swap_buffer(stream_t *str, char **buf, size_t *size)
581 {
582 	void *tb = *buf;
583 	size_t ts = *size;
584 
585 	*buf = str->s_buffer;
586 	*size = str->s_buffer_size;
587 
588 	str->s_buffer = tb;
589 	str->s_buffer_size = ts;
590 }
591