(get_elem): Assign pointer to new row to correct pool entry.
[kopensolaris-gnu/glibc.git] / rt / aio_misc.c
1 /* Handle general operations.
2    Copyright (C) 1997, 1998, 1999, 2000 Free Software Foundation, Inc.
3    This file is part of the GNU C Library.
4    Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
5
6    The GNU C Library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Library General Public License as
8    published by the Free Software Foundation; either version 2 of the
9    License, or (at your option) any later version.
10
11    The GNU C Library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Library General Public License for more details.
15
16    You should have received a copy of the GNU Library General Public
17    License along with the GNU C Library; see the file COPYING.LIB.  If not,
18    write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19    Boston, MA 02111-1307, USA.  */
20
21 #include <aio.h>
22 #include <assert.h>
23 #include <errno.h>
24 #include <limits.h>
25 #include <pthread.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28 #include <sys/stat.h>
29 #include <sys/time.h>
30
31 #include "aio_misc.h"
32
33 static void add_request_to_runlist (struct requestlist *newrequest);
34
35 /* Pool of request list entries.  */
36 static struct requestlist **pool;
37
38 /* Number of total and allocated pool entries.  */
39 static size_t pool_tab_size;
40 static size_t pool_size;
41
42 /* We implement a two dimensional array but allocate each row separately.
43    The macro below determines how many entries should be used per row.
44    It should better be a power of two.  */
45 #define ENTRIES_PER_ROW 16
46
47 /* The row table is incremented in units of this.  */
48 #define ROW_STEP        8
49
50 /* List of available entries.  */
51 static struct requestlist *freelist;
52
53 /* List of request waiting to be processed.  */
54 static struct requestlist *runlist;
55
56 /* Structure list of all currently processed requests.  */
57 static struct requestlist *requests;
58
59 /* Number of threads currently running.  */
60 static int nthreads;
61
62 /* Number of threads waiting for work to arrive. */
63 static int idle_thread_count;
64
65
66 /* These are the values used to optimize the use of AIO.  The user can
67    overwrite them by using the `aio_init' function.  */
68 static struct aioinit optim =
69 {
70   20,   /* int aio_threads;     Maximal number of threads.  */
71   256,  /* int aio_num;         Number of expected simultanious requests. */
72   0,
73   0,
74   0,
75   0,
76   1,
77   0
78 };
79
80
81 /* Since the list is global we need a mutex protecting it.  */
82 pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
83
84 /* When you add a request to the list and there are idle threads present,
85    you signal this condition variable. When a thread finishes work, it waits
86    on this condition variable for a time before it actually exits. */
87 pthread_cond_t __aio_new_request_notification = PTHREAD_COND_INITIALIZER;
88
89
90 /* Functions to handle request list pool.  */
91 static struct requestlist *
92 get_elem (void)
93 {
94   struct requestlist *result;
95
96   if (freelist == NULL)
97     {
98       struct requestlist *new_row;
99       size_t new_size;
100
101       assert (sizeof (struct aiocb) == sizeof (struct aiocb64));
102
103       /* Compute new size.  */
104       new_size = pool_size ? pool_size + ENTRIES_PER_ROW : optim.aio_num;
105
106       if ((new_size / ENTRIES_PER_ROW) >= pool_tab_size)
107         {
108           size_t new_tab_size = new_size / ENTRIES_PER_ROW;
109           struct requestlist **new_tab;
110
111           new_tab = (struct requestlist **)
112             realloc (pool, (new_tab_size * sizeof (struct requestlist *)));
113
114           if (new_tab == NULL)
115             return NULL;
116
117           pool_tab_size = new_tab_size;
118           pool = new_tab;
119         }
120
121       if (pool_size == 0)
122         {
123           size_t cnt;
124
125           new_row = (struct requestlist *)
126             calloc (new_size, sizeof (struct requestlist));
127
128           if (new_row == NULL)
129             return NULL;
130
131           for (cnt = 0; cnt < new_size / ENTRIES_PER_ROW; ++cnt)
132             pool[cnt] = &new_row[cnt * ENTRIES_PER_ROW];
133         }
134       else
135         {
136           /* Allocat one new row.  */
137           new_row = (struct requestlist *)
138             calloc (ENTRIES_PER_ROW, sizeof (struct requestlist));
139           if (new_row == NULL)
140             return NULL;
141
142           pool[new_size / ENTRIES_PER_ROW - 1] = new_row;
143         }
144
145       /* Put all the new entries in the freelist.  */
146       do
147         {
148           new_row->next_prio = freelist;
149           freelist = new_row++;
150         }
151       while (++pool_size < new_size);
152     }
153
154   result = freelist;
155   freelist = freelist->next_prio;
156
157   return result;
158 }
159
160
161 void
162 internal_function
163 __aio_free_request (struct requestlist *elem)
164 {
165   elem->running = no;
166   elem->next_prio = freelist;
167   freelist = elem;
168 }
169
170
171 struct requestlist *
172 internal_function
173 __aio_find_req (aiocb_union *elem)
174 {
175   struct requestlist *runp = requests;
176   int fildes = elem->aiocb.aio_fildes;
177
178   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
179     runp = runp->next_fd;
180
181   if (runp != NULL)
182     {
183       if (runp->aiocbp->aiocb.aio_fildes != fildes)
184         runp = NULL;
185       else
186         while (runp != NULL && runp->aiocbp != elem)
187           runp = runp->next_prio;
188     }
189
190   return runp;
191 }
192
193
194 struct requestlist *
195 internal_function
196 __aio_find_req_fd (int fildes)
197 {
198   struct requestlist *runp = requests;
199
200   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
201     runp = runp->next_fd;
202
203   return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
204           ? runp : NULL);
205 }
206
207
208 /* The thread handler.  */
209 static void *handle_fildes_io (void *arg);
210
211
212 /* User optimization.  */
213 void
214 __aio_init (const struct aioinit *init)
215 {
216   /* Get the mutex.  */
217   pthread_mutex_lock (&__aio_requests_mutex);
218
219   /* Only allow writing new values if the table is not yet allocated.  */
220   if (pool == NULL)
221     {
222       optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
223       optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
224                        ? ENTRIES_PER_ROW
225                        : init->aio_num & ~ENTRIES_PER_ROW);
226     }
227
228   if (init->aio_idle_time != 0)
229     optim.aio_idle_time = init->aio_idle_time;
230
231   /* Release the mutex.  */
232   pthread_mutex_unlock (&__aio_requests_mutex);
233 }
234 weak_alias (__aio_init, aio_init)
235
236
237 /* The main function of the async I/O handling.  It enqueues requests
238    and if necessary starts and handles threads.  */
239 struct requestlist *
240 internal_function
241 __aio_enqueue_request (aiocb_union *aiocbp, int operation)
242 {
243   int result = 0;
244   int policy, prio;
245   struct sched_param param;
246   struct requestlist *last, *runp, *newp;
247   int running = no;
248
249   if (aiocbp->aiocb.aio_reqprio < 0
250       || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX)
251     {
252       /* Invalid priority value.  */
253       __set_errno (EINVAL);
254       aiocbp->aiocb.__error_code = EINVAL;
255       aiocbp->aiocb.__return_value = -1;
256       return NULL;
257     }
258
259   /* Compute priority for this request.  */
260   pthread_getschedparam (pthread_self (), &policy, &param);
261   prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
262
263   /* Get the mutex.  */
264   pthread_mutex_lock (&__aio_requests_mutex);
265
266   last = NULL;
267   runp = requests;
268   /* First look whether the current file descriptor is currently
269      worked with.  */
270   while (runp != NULL
271          && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
272     {
273       last = runp;
274       runp = runp->next_fd;
275     }
276
277   /* Get a new element for the waiting list.  */
278   newp = get_elem ();
279   if (newp == NULL)
280     {
281       pthread_mutex_unlock (&__aio_requests_mutex);
282       __set_errno (EAGAIN);
283       return NULL;
284     }
285   newp->aiocbp = aiocbp;
286   newp->caller_pid = (aiocbp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL
287                       ? getpid () : 0);
288   newp->waiting = NULL;
289
290   aiocbp->aiocb.__abs_prio = prio;
291   aiocbp->aiocb.__policy = policy;
292   aiocbp->aiocb.aio_lio_opcode = operation;
293   aiocbp->aiocb.__error_code = EINPROGRESS;
294   aiocbp->aiocb.__return_value = 0;
295
296   if (runp != NULL
297       && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
298     {
299       /* The current file descriptor is worked on.  It makes no sense
300          to start another thread since this new thread would fight
301          with the running thread for the resources.  But we also cannot
302          say that the thread processing this desriptor shall immediately
303          after finishing the current job process this request if there
304          are other threads in the running queue which have a higher
305          priority.  */
306
307       /* Simply enqueue it after the running one according to the
308          priority.  */
309       while (runp->next_prio != NULL
310              && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
311         runp = runp->next_prio;
312
313       newp->next_prio = runp->next_prio;
314       runp->next_prio = newp;
315
316       running = queued;
317     }
318   else
319     {
320       running = yes;
321       /* Enqueue this request for a new descriptor.  */
322       if (last == NULL)
323         {
324           newp->last_fd = NULL;
325           newp->next_fd = requests;
326           if (requests != NULL)
327             requests->last_fd = newp;
328           requests = newp;
329         }
330       else
331         {
332           newp->next_fd = last->next_fd;
333           newp->last_fd = last;
334           last->next_fd = newp;
335           if (newp->next_fd != NULL)
336             newp->next_fd->last_fd = newp;
337         }
338
339       newp->next_prio = NULL;
340     }
341
342   if (running == yes)
343     {
344       /* We try to create a new thread for this file descriptor.  The
345          function which gets called will handle all available requests
346          for this descriptor and when all are processed it will
347          terminate.
348
349          If no new thread can be created or if the specified limit of
350          threads for AIO is reached we queue the request.  */
351
352       /* See if we need to and are able to create a thread.  */
353       if (nthreads < optim.aio_threads && idle_thread_count == 0)
354         {
355           pthread_t thid;
356           pthread_attr_t attr;
357
358           /* Make sure the thread is created detached.  */
359           pthread_attr_init (&attr);
360           pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
361
362           /* Now try to start a thread.  */
363           if (pthread_create (&thid, &attr, handle_fildes_io, newp) == 0)
364             {
365               /* We managed to enqueue the request.  All errors which can
366                  happen now can be recognized by calls to `aio_return' and
367                  `aio_error'.  */
368               running = allocated;
369               ++nthreads;
370             }
371           else if (nthreads == 0)
372             /* We cannot create a thread in the moment and there is
373                also no thread running.  This is a problem.  `errno' is
374                set to EAGAIN if this is only a temporary problem.  */
375             result = -1;
376         }
377     }
378
379   /* Enqueue the request in the run queue if it is not yet running.  */
380   if (running == yes && result == 0)
381     {
382       add_request_to_runlist (newp);
383
384       /* If there is a thread waiting for work, then let it know that we
385          have just given it something to do. */
386       if (idle_thread_count > 0)
387         pthread_cond_signal (&__aio_new_request_notification);
388     }
389
390   if (result == 0)
391     newp->running = running;
392   else
393     {
394       /* Something went wrong.  */
395       __aio_free_request (newp);
396       newp = NULL;
397     }
398
399   /* Release the mutex.  */
400   pthread_mutex_unlock (&__aio_requests_mutex);
401
402   return newp;
403 }
404
405
406 static void *
407 handle_fildes_io (void *arg)
408 {
409   pthread_t self = pthread_self ();
410   struct sched_param param;
411   struct requestlist *runp = (struct requestlist *) arg;
412   aiocb_union *aiocbp;
413   int policy;
414   int fildes;
415
416   pthread_getschedparam (self, &policy, &param);
417
418   do
419     {
420       /* If runp is NULL, then we were created to service the work queue
421          in general, not to handle any particular request. In that case we
422          skip the "do work" stuff on the first pass, and go directly to the
423          "get work off the work queue" part of this loop, which is near the
424          end. */
425       if (runp == NULL)
426         pthread_mutex_lock (&__aio_requests_mutex);
427       else
428         {
429           /* Update our variables.  */
430           aiocbp = runp->aiocbp;
431           fildes = aiocbp->aiocb.aio_fildes;
432
433           /* Change the priority to the requested value (if necessary).  */
434           if (aiocbp->aiocb.__abs_prio != param.sched_priority
435               || aiocbp->aiocb.__policy != policy)
436             {
437               param.sched_priority = aiocbp->aiocb.__abs_prio;
438               policy = aiocbp->aiocb.__policy;
439               pthread_setschedparam (self, policy, &param);
440             }
441
442           /* Process request pointed to by RUNP.  We must not be disturbed
443              by signals.  */
444           if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
445             {
446               if (aiocbp->aiocb.aio_lio_opcode & 128)
447                 aiocbp->aiocb.__return_value =
448                   TEMP_FAILURE_RETRY (__pread64 (fildes, (void *)
449                                                  aiocbp->aiocb64.aio_buf,
450                                                  aiocbp->aiocb64.aio_nbytes,
451                                                  aiocbp->aiocb64.aio_offset));
452               else
453                 aiocbp->aiocb.__return_value =
454                   TEMP_FAILURE_RETRY (pread (fildes,
455                                              (void *) aiocbp->aiocb.aio_buf,
456                                              aiocbp->aiocb.aio_nbytes,
457                                              aiocbp->aiocb.aio_offset));
458
459               if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
460                 /* The Linux kernel is different from others.  It returns
461                    ESPIPE if using pread on a socket.  Other platforms
462                    simply ignore the offset parameter and behave like
463                    read.  */
464                 aiocbp->aiocb.__return_value =
465                   TEMP_FAILURE_RETRY (read (fildes,
466                                             (void *) aiocbp->aiocb64.aio_buf,
467                                             aiocbp->aiocb64.aio_nbytes));
468             }
469           else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
470             {
471               if (aiocbp->aiocb.aio_lio_opcode & 128)
472                 aiocbp->aiocb.__return_value =
473                   TEMP_FAILURE_RETRY (__pwrite64 (fildes, (const void *)
474                                                   aiocbp->aiocb64.aio_buf,
475                                                   aiocbp->aiocb64.aio_nbytes,
476                                                   aiocbp->aiocb64.aio_offset));
477               else
478                 aiocbp->aiocb.__return_value =
479                   TEMP_FAILURE_RETRY (pwrite (fildes, (const void *)
480                                               aiocbp->aiocb.aio_buf,
481                                               aiocbp->aiocb.aio_nbytes,
482                                               aiocbp->aiocb.aio_offset));
483
484               if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
485                 /* The Linux kernel is different from others.  It returns
486                    ESPIPE if using pwrite on a socket.  Other platforms
487                    simply ignore the offset parameter and behave like
488                    write.  */
489                 aiocbp->aiocb.__return_value =
490                   TEMP_FAILURE_RETRY (write (fildes,
491                                              (void *) aiocbp->aiocb64.aio_buf,
492                                              aiocbp->aiocb64.aio_nbytes));
493             }
494           else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
495             aiocbp->aiocb.__return_value =
496               TEMP_FAILURE_RETRY (fdatasync (fildes));
497           else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
498             aiocbp->aiocb.__return_value =
499               TEMP_FAILURE_RETRY (fsync (fildes));
500           else
501             {
502               /* This is an invalid opcode.  */
503               aiocbp->aiocb.__return_value = -1;
504               __set_errno (EINVAL);
505             }
506
507           /* Get the mutex.  */
508           pthread_mutex_lock (&__aio_requests_mutex);
509
510           if (aiocbp->aiocb.__return_value == -1)
511             aiocbp->aiocb.__error_code = errno;
512           else
513             aiocbp->aiocb.__error_code = 0;
514
515           /* Send the signal to notify about finished processing of the
516              request.  */
517           __aio_notify (runp);
518
519           /* Now dequeue the current request.  */
520           if (runp->next_prio == NULL)
521             {
522               /* No outstanding request for this descriptor.  Remove this
523                  descriptor from the list.  */
524               if (runp->next_fd != NULL)
525                 runp->next_fd->last_fd = runp->last_fd;
526               if (runp->last_fd != NULL)
527                 runp->last_fd->next_fd = runp->next_fd;
528               else
529                 requests = runp->next_fd;
530             }
531           else
532             {
533               runp->next_prio->last_fd = runp->last_fd;
534               runp->next_prio->next_fd = runp->next_fd;
535               runp->next_prio->running = yes;
536               if (runp->next_fd != NULL)
537                 runp->next_fd->last_fd = runp->next_prio;
538               if (runp->last_fd != NULL)
539                 runp->last_fd->next_fd = runp->next_prio;
540               else
541                 requests = runp->next_prio;
542               add_request_to_runlist (runp->next_prio);
543             }
544
545           /* Free the old element.  */
546           __aio_free_request (runp);
547         }
548
549       runp = runlist;
550
551       /* If the runlist is empty, then we sleep for a while, waiting for
552          something to arrive in it. */
553       if (runp == NULL && optim.aio_idle_time >= 0)
554         {
555           struct timeval now;
556           struct timespec wakeup_time;
557
558           ++idle_thread_count;
559           gettimeofday (&now, NULL);
560           wakeup_time.tv_sec = now.tv_sec + optim.aio_idle_time;
561           wakeup_time.tv_nsec = now.tv_usec * 1000;
562           if (wakeup_time.tv_nsec > 1000000000)
563             {
564               wakeup_time.tv_nsec -= 1000000000;
565               ++wakeup_time.tv_sec;
566             }
567           pthread_cond_timedwait (&__aio_new_request_notification,
568                                   &__aio_requests_mutex,
569                                   &wakeup_time);
570           --idle_thread_count;
571           runp = runlist;
572         }
573
574       if (runp == NULL)
575         --nthreads;
576       else
577         {
578           assert (runp->running == yes);
579           runp->running = allocated;
580           runlist = runp->next_run;
581
582           /* If we have a request to process, and there's still another in
583              the run list, then we need to either wake up or create a new
584              thread to service the request that is still in the run list. */
585           if (runlist != NULL)
586             {
587               /* There are at least two items in the work queue to work on.
588                  If there are other idle threads, then we should wake them
589                  up for these other work elements; otherwise, we should try
590                  to create a new thread. */
591               if (idle_thread_count > 0)
592                 pthread_cond_signal (&__aio_new_request_notification);
593               else if (nthreads < optim.aio_threads)
594                 {
595                   pthread_t thid;
596                   pthread_attr_t attr;
597
598                   /* Make sure the thread is created detached.  */
599                   pthread_attr_init (&attr);
600                   pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
601
602                   /* Now try to start a thread. If we fail, no big deal,
603                      because we know that there is at least one thread (us)
604                      that is working on AIO operations. */
605                   if (pthread_create (&thid, &attr, handle_fildes_io, NULL)
606                       == 0)
607                     ++nthreads;
608                 }
609             }
610         }
611
612       /* Release the mutex.  */
613       pthread_mutex_unlock (&__aio_requests_mutex);
614     }
615   while (runp != NULL);
616
617   pthread_exit (NULL);
618 }
619
620
621 /* Free allocated resources.  */
622 static void
623 __attribute__ ((unused))
624 free_res (void)
625 {
626   size_t row;
627
628   /* The first block of rows as specified in OPTIM is allocated in
629      one chunk.  */
630   free (pool[0]);
631
632   for (row = optim.aio_num / ENTRIES_PER_ROW; row < pool_tab_size; ++row)
633     free (pool[row]);
634
635   free (pool);
636 }
637 text_set_element (__libc_subfreeres, free_res);
638
639
640 /* Add newrequest to the runlist. The __abs_prio flag of newrequest must
641    be correctly set to do this. Also, you had better set newrequest's
642    "running" flag to "yes" before you release your lock or you'll throw an
643    assertion. */
644 static void
645 add_request_to_runlist (struct requestlist *newrequest)
646 {
647   int prio = newrequest->aiocbp->aiocb.__abs_prio;
648   struct requestlist *runp;
649
650   if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
651     {
652       newrequest->next_run = runlist;
653       runlist = newrequest;
654     }
655   else
656     {
657       runp = runlist;
658
659       while (runp->next_run != NULL
660              && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
661         runp = runp->next_run;
662
663       newrequest->next_run = runp->next_run;
664       runp->next_run = newrequest;
665     }
666 }