Include limits.h.
[kopensolaris-gnu/glibc.git] / rt / aio_misc.c
1 /* Handle general operations.
2    Copyright (C) 1997, 1998 Free Software Foundation, Inc.
3    This file is part of the GNU C Library.
4    Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
5
6    The GNU C Library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Library General Public License as
8    published by the Free Software Foundation; either version 2 of the
9    License, or (at your option) any later version.
10
11    The GNU C Library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Library General Public License for more details.
15
16    You should have received a copy of the GNU Library General Public
17    License along with the GNU C Library; see the file COPYING.LIB.  If not,
18    write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19    Boston, MA 02111-1307, USA.  */
20
21 #include <aio.h>
22 #include <errno.h>
23 #include <limits.h>
24 #include <pthread.h>
25 #include <stdlib.h>
26 #include <unistd.h>
27 #include <sys/stat.h>
28
29 #include "aio_misc.h"
30
31 /* Pool of request list entries.  */
32 static struct requestlist **pool;
33
34 /* Number of total and allocated pool entries.  */
35 static size_t pool_tab_size;
36 static size_t pool_size;
37
38 /* We implement a two dimensional array but allocate each row separately.
39    The macro below determines how many entries should be used per row.
40    It should better be a power of two.  */
41 #define ENTRIES_PER_ROW 16
42
43 /* The row table is incremented in units of this.  */
44 #define ROW_STEP        8
45
46 /* List of available entries.  */
47 static struct requestlist *freelist;
48
49 /* List of request waiting to be processed.  */
50 static struct requestlist *runlist;
51
52 /* Structure list of all currently processed requests.  */
53 static struct requestlist *requests;
54
55 /* Number of threads currently running.  */
56 static int nthreads;
57
58
59 /* These are the values used to optimize the use of AIO.  The user can
60    overwrite them by using the `aio_init' function.  */
61 static struct aioinit optim =
62 {
63   20,   /* int aio_threads;     Maximal number of threads.  */
64   256,  /* int aio_num;         Number of expected simultanious requests. */
65   0,
66   0,
67   0,
68   0,
69   { 0, }
70 };
71
72
73 /* Since the list is global we need a mutex protecting it.  */
74 pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
75
76
77 /* Functions to handle request list pool.  */
78 static struct requestlist *
79 get_elem (void)
80 {
81   struct requestlist *result;
82
83   if (freelist == NULL)
84     {
85       struct requestlist *new_row;
86       size_t new_size;
87
88       /* Compute new size.  */
89       new_size = pool_size ? pool_size + ENTRIES_PER_ROW : optim.aio_num;
90
91       if ((new_size / ENTRIES_PER_ROW) >= pool_tab_size)
92         {
93           size_t new_tab_size = new_size / ENTRIES_PER_ROW;
94           struct requestlist **new_tab;
95
96           new_tab = (struct requestlist **)
97             realloc (pool, (new_tab_size * sizeof (struct requestlist *)));
98
99           if (new_tab == NULL)
100             return NULL;
101
102           pool_tab_size = new_tab_size;
103           pool = new_tab;
104         }
105
106       if (pool_size == 0)
107         {
108           size_t cnt;
109
110           new_row = (struct requestlist *)
111             calloc (new_size, sizeof (struct requestlist));
112
113           if (new_row == NULL)
114             return NULL;
115
116           for (cnt = 0; cnt < new_size / ENTRIES_PER_ROW; ++cnt)
117             pool[cnt] = &new_row[cnt * ENTRIES_PER_ROW];
118         }
119       else
120         {
121           /* Allocat one new row.  */
122           new_row = (struct requestlist *)
123             calloc (ENTRIES_PER_ROW, sizeof (struct requestlist));
124           if (new_row == NULL)
125             return NULL;
126
127           pool[new_size / ENTRIES_PER_ROW] = new_row;
128         }
129
130       /* Put all the new entries in the freelist.  */
131       do
132         {
133           new_row->next_prio = freelist;
134           freelist = new_row++;
135         }
136       while (++pool_size < new_size);
137     }
138
139   result = freelist;
140   freelist = freelist->next_prio;
141
142   return result;
143 }
144
145
146 void
147 __aio_free_request (struct requestlist *elem)
148 {
149   elem->running = no;
150   elem->next_prio = freelist;
151   freelist = elem;
152 }
153
154
155 struct requestlist *
156 __aio_find_req (aiocb_union *elem)
157 {
158   struct requestlist *runp = requests;
159   int fildes = elem->aiocb.aio_fildes;
160
161   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
162     runp = runp->next_fd;
163
164   if (runp != NULL)
165     if (runp->aiocbp->aiocb.aio_fildes != fildes)
166       runp = NULL;
167     else
168       while (runp != NULL && runp->aiocbp != elem)
169         runp = runp->next_prio;
170
171   return runp;
172 }
173
174
175 struct requestlist *
176 __aio_find_req_fd (int fildes)
177 {
178   struct requestlist *runp = requests;
179
180   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
181     runp = runp->next_fd;
182
183   return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
184           ? runp : NULL);
185 }
186
187
188 /* The thread handler.  */
189 static void *handle_fildes_io (void *arg);
190
191
192 /* User optimization.  */
193 void
194 __aio_init (const struct aioinit *init)
195 {
196   /* Get the mutex.  */
197   pthread_mutex_lock (&__aio_requests_mutex);
198
199   /* Only allow writing new values if the table is not yet allocated.  */
200   if (pool == NULL)
201     {
202       optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
203       optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
204                        ? ENTRIES_PER_ROW
205                        : init->aio_num & ~ENTRIES_PER_ROW);
206     }
207
208   /* Release the mutex.  */
209   pthread_mutex_unlock (&__aio_requests_mutex);
210 }
211 weak_alias (__aio_init, aio_init)
212
213
214 /* The main function of the async I/O handling.  It enqueues requests
215    and if necessary starts and handles threads.  */
216 struct requestlist *
217 __aio_enqueue_request (aiocb_union *aiocbp, int operation)
218 {
219   int result = 0;
220   int policy, prio;
221   struct sched_param param;
222   struct requestlist *last, *runp, *newp;
223   int running = no;
224
225   if (aiocbp->aiocb.aio_reqprio < 0
226       || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX)
227     {
228       /* Invalid priority value.  */
229       __set_errno (EINVAL);
230       aiocbp->aiocb.__error_code = EINVAL;
231       aiocbp->aiocb.__return_value = -1;
232       return NULL;
233     }
234
235   /* Compute priority for this request.  */
236   pthread_getschedparam (pthread_self (), &policy, &param);
237   prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
238
239   /* Get the mutex.  */
240   pthread_mutex_lock (&__aio_requests_mutex);
241
242   last = NULL;
243   runp = requests;
244   /* First look whether the current file descriptor is currently
245      worked with.  */
246   while (runp != NULL
247          && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
248     {
249       last = runp;
250       runp = runp->next_fd;
251     }
252
253   /* Get a new element for the waiting list.  */
254   newp = get_elem ();
255   if (newp == NULL)
256     {
257       __set_errno (EAGAIN);
258       pthread_mutex_unlock (&__aio_requests_mutex);
259       return NULL;
260     }
261   newp->aiocbp = aiocbp;
262   newp->waiting = NULL;
263
264   aiocbp->aiocb.__abs_prio = prio;
265   aiocbp->aiocb.__policy = policy;
266   aiocbp->aiocb.aio_lio_opcode = operation;
267   aiocbp->aiocb.__error_code = EINPROGRESS;
268   aiocbp->aiocb.__return_value = 0;
269
270   if (runp != NULL
271       && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
272     {
273       /* The current file descriptor is worked on.  It makes no sense
274          to start another thread since this new thread would fight
275          with the running thread for the resources.  But we also cannot
276          say that the thread processing this desriptor shall immediately
277          after finishing the current job process this request if there
278          are other threads in the running queue which have a higher
279          priority.  */
280
281       /* Simply enqueue it after the running one according to the
282          priority.  */
283       while (runp->next_prio != NULL
284              && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
285         runp = runp->next_prio;
286
287       newp->next_prio = runp->next_prio;
288       runp->next_prio = newp;
289
290       running = queued;
291     }
292   else
293     {
294       /* Enqueue this request for a new descriptor.  */
295       if (last == NULL)
296         {
297           newp->last_fd = NULL;
298           newp->next_fd = requests;
299           if (requests != NULL)
300             requests->last_fd = newp;
301           requests = newp;
302         }
303       else
304         {
305           newp->next_fd = last->next_fd;
306           newp->last_fd = last;
307           last->next_fd = newp;
308           if (newp->next_fd != NULL)
309             newp->next_fd->last_fd = newp;
310         }
311
312       newp->next_prio = NULL;
313     }
314
315   if (running == no)
316     {
317       /* We try to create a new thread for this file descriptor.  The
318          function which gets called will handle all available requests
319          for this descriptor and when all are processed it will
320          terminate.
321
322          If no new thread can be created or if the specified limit of
323          threads for AIO is reached we queue the request.  */
324
325       /* See if we can create a thread.  */
326       if (nthreads < optim.aio_threads)
327         {
328           pthread_t thid;
329           pthread_attr_t attr;
330
331           /* Make sure the thread is created detached.  */
332           pthread_attr_init (&attr);
333           pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
334
335           /* Now try to start a thread.  */
336           if (pthread_create (&thid, &attr, handle_fildes_io, newp) == 0)
337             {
338               /* We managed to enqueue the request.  All errors which can
339                  happen now can be recognized by calls to `aio_return' and
340                  `aio_error'.  */
341               running = allocated;
342               ++nthreads;
343             }
344           else if (nthreads == 0)
345             /* We cannot create a thread in the moment and there is
346                also no thread running.  This is a problem.  `errno' is
347                set to EAGAIN if this is only a temporary problem.  */
348             result = -1;
349         }
350     }
351
352   /* Enqueue the request in the run queue if it is not yet running.  */
353   if (running < yes && result == 0)
354     {
355       if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
356         {
357           newp->next_run = runlist;
358           runlist = newp;
359         }
360       else
361         {
362           runp = runlist;
363
364           while (runp->next_run != NULL
365                  && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
366             runp = runp->next_run;
367
368           newp->next_run = runp->next_run;
369           runp->next_run = newp;
370         }
371     }
372
373   if (result == 0)
374     newp->running = running;
375   else
376     {
377       /* Something went wrong.  */
378       __aio_free_request (newp);
379       newp = NULL;
380     }
381
382   /* Release the mutex.  */
383   pthread_mutex_unlock (&__aio_requests_mutex);
384
385   return newp;
386 }
387
388
389 static void *
390 handle_fildes_io (void *arg)
391 {
392   pthread_t self = pthread_self ();
393   struct sched_param param;
394   struct requestlist *runp = (struct requestlist *) arg;
395   aiocb_union *aiocbp;
396   int policy;
397   int fildes;
398
399   pthread_getschedparam (self, &policy, &param);
400
401   do
402     {
403       /* Update our variables.  */
404       aiocbp = runp->aiocbp;
405       fildes = aiocbp->aiocb.aio_fildes;
406
407       /* Change the priority to the requested value (if necessary).  */
408       if (aiocbp->aiocb.__abs_prio != param.sched_priority
409           || aiocbp->aiocb.__policy != policy)
410         {
411           param.sched_priority = aiocbp->aiocb.__abs_prio;
412           policy = aiocbp->aiocb.__policy;
413           pthread_setschedparam (self, policy, &param);
414         }
415
416       /* Process request pointed to by RUNP.  We must not be disturbed
417          by signals.  */
418       if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
419         {
420           if (aiocbp->aiocb.aio_lio_opcode & 128)
421             aiocbp->aiocb.__return_value =
422               TEMP_FAILURE_RETRY (__pread64 (fildes,
423                                              (void *) aiocbp->aiocb64.aio_buf,
424                                              aiocbp->aiocb64.aio_nbytes,
425                                              aiocbp->aiocb64.aio_offset));
426           else
427             aiocbp->aiocb.__return_value =
428               TEMP_FAILURE_RETRY (pread (fildes,
429                                          (void *) aiocbp->aiocb.aio_buf,
430                                          aiocbp->aiocb.aio_nbytes,
431                                          aiocbp->aiocb.aio_offset));
432         }
433       else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
434         {
435           if (aiocbp->aiocb.aio_lio_opcode & 128)
436             aiocbp->aiocb.__return_value =
437               TEMP_FAILURE_RETRY (__pwrite64 (fildes,
438                                               (const void *) aiocbp->aiocb64.aio_buf,
439                                               aiocbp->aiocb64.aio_nbytes,
440                                               aiocbp->aiocb64.aio_offset));
441           else
442             aiocbp->aiocb.__return_value =
443               TEMP_FAILURE_RETRY (pwrite (fildes,
444                                           (const void *) aiocbp->aiocb.aio_buf,
445                                           aiocbp->aiocb.aio_nbytes,
446                                           aiocbp->aiocb.aio_offset));
447         }
448       else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
449         aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fdatasync (fildes));
450       else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
451         aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fsync (fildes));
452       else
453         {
454           /* This is an invalid opcode.  */
455           aiocbp->aiocb.__return_value = -1;
456           __set_errno (EINVAL);
457         }
458
459       /* Get the mutex.  */
460       pthread_mutex_lock (&__aio_requests_mutex);
461
462       if (aiocbp->aiocb.__return_value == -1)
463         aiocbp->aiocb.__error_code = errno;
464       else
465         aiocbp->aiocb.__error_code = 0;
466
467       /* Send the signal to notify about finished processing of the
468          request.  */
469       __aio_notify (runp);
470
471       /* Now dequeue the current request.  */
472       if (runp->next_prio == NULL)
473         {
474           /* No outstanding request for this descriptor.  Remove this
475              descriptor from the list.  */
476           if (runp->next_fd != NULL)
477             runp->next_fd->last_fd = runp->last_fd;
478           if (runp->last_fd != NULL)
479             runp->last_fd->next_fd = runp->next_fd;
480           else
481             requests = runp->next_fd;
482         }
483       else
484         {
485           runp->next_prio->last_fd = runp->last_fd;
486           runp->next_prio->next_fd = runp->next_fd;
487           runp->next_prio->running = yes;
488           if (runp->next_fd != NULL)
489             runp->next_fd->last_fd = runp->next_prio;
490           if (runp->last_fd != NULL)
491             runp->last_fd->next_fd = runp->next_prio;
492           else
493             requests = runp->next_prio;
494         }
495
496       /* Free the old element.  */
497       __aio_free_request (runp);
498
499       runp = runlist;
500       if (runp != NULL)
501         {
502           /* We must not run requests which are not marked `running'.  */
503           if (runp->running == yes)
504             runlist = runp->next_run;
505           else
506             {
507               struct requestlist *old;
508
509               do
510                 {
511                   old = runp;
512                   runp = runp->next_run;
513                 }
514               while (runp != NULL && runp->running != yes);
515
516               if (runp != NULL)
517                 old->next_run = runp->next_run;
518             }
519         }
520
521       /* If no request to work on we will stop the thread.  */
522       if (runp == NULL)
523         --nthreads;
524       else
525         runp->running = allocated;
526
527       /* Release the mutex.  */
528       pthread_mutex_unlock (&__aio_requests_mutex);
529     }
530   while (runp != NULL);
531
532   pthread_exit (NULL);
533 }
534
535
536 /* Free allocated resources.  */
537 static void
538 __attribute__ ((unused))
539 free_res (void)
540 {
541   size_t row;
542
543   /* The first block of rows as specified in OPTIM is allocated in
544      one chunk.  */
545   free (pool[0]);
546
547   for (row = optim.aio_num / ENTRIES_PER_ROW; row < pool_tab_size; ++row)
548     free (pool[row]);
549
550   free (pool);
551 }
552
553 text_set_element (__libc_subfreeres, free_res);