(__ieee754_sqrt): Remove reference to usqrt.
[kopensolaris-gnu/glibc.git] / rt / aio_misc.c
1 /* Handle general operations.
2    Copyright (C) 1997, 1998, 1999, 2000, 2001 Free Software Foundation, Inc.
3    This file is part of the GNU C Library.
4    Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
5
6    The GNU C Library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Library General Public License as
8    published by the Free Software Foundation; either version 2 of the
9    License, or (at your option) any later version.
10
11    The GNU C Library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Library General Public License for more details.
15
16    You should have received a copy of the GNU Library General Public
17    License along with the GNU C Library; see the file COPYING.LIB.  If not,
18    write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19    Boston, MA 02111-1307, USA.  */
20
21 #include <aio.h>
22 #include <assert.h>
23 #include <errno.h>
24 #include <limits.h>
25 #include <pthread.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28 #include <sys/stat.h>
29 #include <sys/time.h>
30
31 #include "aio_misc.h"
32
33 static void add_request_to_runlist (struct requestlist *newrequest);
34
35 /* Pool of request list entries.  */
36 static struct requestlist **pool;
37
38 /* Number of total and allocated pool entries.  */
39 static size_t pool_max_size;
40 static size_t pool_size;
41
42 /* We implement a two dimensional array but allocate each row separately.
43    The macro below determines how many entries should be used per row.
44    It should better be a power of two.  */
45 #define ENTRIES_PER_ROW 32
46
47 /* How many rows we allocate at once.  */
48 #define ROWS_STEP       8
49
50 /* List of available entries.  */
51 static struct requestlist *freelist;
52
53 /* List of request waiting to be processed.  */
54 static struct requestlist *runlist;
55
56 /* Structure list of all currently processed requests.  */
57 static struct requestlist *requests;
58
59 /* Number of threads currently running.  */
60 static int nthreads;
61
62 /* Number of threads waiting for work to arrive. */
63 static int idle_thread_count;
64
65
66 /* These are the values used to optimize the use of AIO.  The user can
67    overwrite them by using the `aio_init' function.  */
68 static struct aioinit optim =
69 {
70   20,   /* int aio_threads;     Maximal number of threads.  */
71   64,   /* int aio_num;         Number of expected simultanious requests. */
72   0,
73   0,
74   0,
75   0,
76   1,
77   0
78 };
79
80
81 /* Since the list is global we need a mutex protecting it.  */
82 pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
83
84 /* When you add a request to the list and there are idle threads present,
85    you signal this condition variable. When a thread finishes work, it waits
86    on this condition variable for a time before it actually exits. */
87 pthread_cond_t __aio_new_request_notification = PTHREAD_COND_INITIALIZER;
88
89
90 /* Functions to handle request list pool.  */
91 static struct requestlist *
92 get_elem (void)
93 {
94   struct requestlist *result;
95
96   if (freelist == NULL)
97     {
98       struct requestlist *new_row;
99       int cnt;
100
101       assert (sizeof (struct aiocb) == sizeof (struct aiocb64));
102
103       if (pool_size + 1 >= pool_max_size)
104         {
105           size_t new_max_size = pool_max_size + ROWS_STEP;
106           struct requestlist **new_tab;
107
108           new_tab = (struct requestlist **)
109             realloc (pool, new_max_size * sizeof (struct requestlist *));
110
111           if (new_tab == NULL)
112             return NULL;
113
114           pool_max_size = new_max_size;
115           pool = new_tab;
116         }
117
118       /* Allocate the new row.  */
119       cnt = pool_size == 0 ? optim.aio_num : ENTRIES_PER_ROW;
120       new_row = (struct requestlist *) calloc (cnt,
121                                                sizeof (struct requestlist));
122       if (new_row == NULL)
123         return NULL;
124
125       pool[pool_size++] = new_row;
126
127       /* Put all the new entries in the freelist.  */
128       do
129         {
130           new_row->next_prio = freelist;
131           freelist = new_row++;
132         }
133       while (--cnt > 0);
134     }
135
136   result = freelist;
137   freelist = freelist->next_prio;
138
139   return result;
140 }
141
142
143 void
144 internal_function
145 __aio_free_request (struct requestlist *elem)
146 {
147   elem->running = no;
148   elem->next_prio = freelist;
149   freelist = elem;
150 }
151
152
153 struct requestlist *
154 internal_function
155 __aio_find_req (aiocb_union *elem)
156 {
157   struct requestlist *runp = requests;
158   int fildes = elem->aiocb.aio_fildes;
159
160   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
161     runp = runp->next_fd;
162
163   if (runp != NULL)
164     {
165       if (runp->aiocbp->aiocb.aio_fildes != fildes)
166         runp = NULL;
167       else
168         while (runp != NULL && runp->aiocbp != elem)
169           runp = runp->next_prio;
170     }
171
172   return runp;
173 }
174
175
176 struct requestlist *
177 internal_function
178 __aio_find_req_fd (int fildes)
179 {
180   struct requestlist *runp = requests;
181
182   while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
183     runp = runp->next_fd;
184
185   return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
186           ? runp : NULL);
187 }
188
189
190 void
191 internal_function
192 __aio_remove_request (struct requestlist *last, struct requestlist *req,
193                       int all)
194 {
195   assert (req->running == yes || req->running == queued
196           || req->running == done);
197
198   if (last != NULL)
199     last->next_prio = all ? NULL : req->next_prio;
200   else
201     {
202       if (all || req->next_prio == NULL)
203         {
204           if (req->last_fd != NULL)
205             req->last_fd->next_fd = req->next_fd;
206           else
207             requests = req->next_fd;
208           if (req->next_fd != NULL)
209             req->next_fd->last_fd = req->last_fd;
210         }
211       else
212         {
213           if (req->last_fd != NULL)
214             req->last_fd->next_fd = req->next_prio;
215           else
216             requests = req->next_prio;
217
218           if (req->next_fd != NULL)
219             req->next_fd->last_fd = req->next_prio;
220
221           req->next_prio->last_fd = req->last_fd;
222           req->next_prio->next_fd = req->next_fd;
223
224           /* Mark this entry as runnable.  */
225           req->next_prio->running = yes;
226         }
227
228       if (req->running == yes)
229         {
230           struct requestlist *runp = runlist;
231
232           last = NULL;
233           while (runp != NULL)
234             {
235               if (runp == req)
236                 {
237                   if (last == NULL)
238                     runlist = runp->next_run;
239                   else
240                     last->next_run = runp->next_run;
241                   break;
242                 }
243               last = runp;
244               runp = runp->next_run;
245             }
246         }
247     }
248 }
249
250
251 /* The thread handler.  */
252 static void *handle_fildes_io (void *arg);
253
254
255 /* User optimization.  */
256 void
257 __aio_init (const struct aioinit *init)
258 {
259   /* Get the mutex.  */
260   pthread_mutex_lock (&__aio_requests_mutex);
261
262   /* Only allow writing new values if the table is not yet allocated.  */
263   if (pool == NULL)
264     {
265       optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
266       optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
267                        ? ENTRIES_PER_ROW
268                        : init->aio_num & ~ENTRIES_PER_ROW);
269     }
270
271   if (init->aio_idle_time != 0)
272     optim.aio_idle_time = init->aio_idle_time;
273
274   /* Release the mutex.  */
275   pthread_mutex_unlock (&__aio_requests_mutex);
276 }
277 weak_alias (__aio_init, aio_init)
278
279
280 /* The main function of the async I/O handling.  It enqueues requests
281    and if necessary starts and handles threads.  */
282 struct requestlist *
283 internal_function
284 __aio_enqueue_request (aiocb_union *aiocbp, int operation)
285 {
286   int result = 0;
287   int policy, prio;
288   struct sched_param param;
289   struct requestlist *last, *runp, *newp;
290   int running = no;
291
292   if (operation == LIO_SYNC || operation == LIO_DSYNC)
293     aiocbp->aiocb.aio_reqprio = 0;
294   else if (aiocbp->aiocb.aio_reqprio < 0
295            || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX)
296     {
297       /* Invalid priority value.  */
298       __set_errno (EINVAL);
299       aiocbp->aiocb.__error_code = EINVAL;
300       aiocbp->aiocb.__return_value = -1;
301       return NULL;
302     }
303
304   /* Compute priority for this request.  */
305   pthread_getschedparam (pthread_self (), &policy, &param);
306   prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
307
308   /* Get the mutex.  */
309   pthread_mutex_lock (&__aio_requests_mutex);
310
311   last = NULL;
312   runp = requests;
313   /* First look whether the current file descriptor is currently
314      worked with.  */
315   while (runp != NULL
316          && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
317     {
318       last = runp;
319       runp = runp->next_fd;
320     }
321
322   /* Get a new element for the waiting list.  */
323   newp = get_elem ();
324   if (newp == NULL)
325     {
326       pthread_mutex_unlock (&__aio_requests_mutex);
327       __set_errno (EAGAIN);
328       return NULL;
329     }
330   newp->aiocbp = aiocbp;
331   newp->caller_pid = (aiocbp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL
332                       ? getpid () : 0);
333   newp->waiting = NULL;
334
335   aiocbp->aiocb.__abs_prio = prio;
336   aiocbp->aiocb.__policy = policy;
337   aiocbp->aiocb.aio_lio_opcode = operation;
338   aiocbp->aiocb.__error_code = EINPROGRESS;
339   aiocbp->aiocb.__return_value = 0;
340
341   if (runp != NULL
342       && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
343     {
344       /* The current file descriptor is worked on.  It makes no sense
345          to start another thread since this new thread would fight
346          with the running thread for the resources.  But we also cannot
347          say that the thread processing this desriptor shall immediately
348          after finishing the current job process this request if there
349          are other threads in the running queue which have a higher
350          priority.  */
351
352       /* Simply enqueue it after the running one according to the
353          priority.  */
354       while (runp->next_prio != NULL
355              && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
356         runp = runp->next_prio;
357
358       newp->next_prio = runp->next_prio;
359       runp->next_prio = newp;
360
361       running = queued;
362     }
363   else
364     {
365       running = yes;
366       /* Enqueue this request for a new descriptor.  */
367       if (last == NULL)
368         {
369           newp->last_fd = NULL;
370           newp->next_fd = requests;
371           if (requests != NULL)
372             requests->last_fd = newp;
373           requests = newp;
374         }
375       else
376         {
377           newp->next_fd = last->next_fd;
378           newp->last_fd = last;
379           last->next_fd = newp;
380           if (newp->next_fd != NULL)
381             newp->next_fd->last_fd = newp;
382         }
383
384       newp->next_prio = NULL;
385     }
386
387   if (running == yes)
388     {
389       /* We try to create a new thread for this file descriptor.  The
390          function which gets called will handle all available requests
391          for this descriptor and when all are processed it will
392          terminate.
393
394          If no new thread can be created or if the specified limit of
395          threads for AIO is reached we queue the request.  */
396
397       /* See if we need to and are able to create a thread.  */
398       if (nthreads < optim.aio_threads && idle_thread_count == 0)
399         {
400           pthread_t thid;
401           pthread_attr_t attr;
402
403           /* Make sure the thread is created detached.  */
404           pthread_attr_init (&attr);
405           pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
406
407           running = newp->running = allocated;
408
409           /* Now try to start a thread.  */
410           if (pthread_create (&thid, &attr, handle_fildes_io, newp) == 0)
411             /* We managed to enqueue the request.  All errors which can
412                happen now can be recognized by calls to `aio_return' and
413                `aio_error'.  */
414             ++nthreads;
415           else
416             {
417               /* Reset the running flag.  The new request is not running.  */
418               running = newp->running = yes;
419
420               if (nthreads == 0)
421                 /* We cannot create a thread in the moment and there is
422                    also no thread running.  This is a problem.  `errno' is
423                    set to EAGAIN if this is only a temporary problem.  */
424                 result = -1;
425             }
426         }
427     }
428
429   /* Enqueue the request in the run queue if it is not yet running.  */
430   if (running == yes && result == 0)
431     {
432       add_request_to_runlist (newp);
433
434       /* If there is a thread waiting for work, then let it know that we
435          have just given it something to do. */
436       if (idle_thread_count > 0)
437         pthread_cond_signal (&__aio_new_request_notification);
438     }
439
440   if (result == 0)
441     newp->running = running;
442   else
443     {
444       /* Something went wrong.  */
445       __aio_free_request (newp);
446       newp = NULL;
447     }
448
449   /* Release the mutex.  */
450   pthread_mutex_unlock (&__aio_requests_mutex);
451
452   return newp;
453 }
454
455
456 static void *
457 handle_fildes_io (void *arg)
458 {
459   pthread_t self = pthread_self ();
460   struct sched_param param;
461   struct requestlist *runp = (struct requestlist *) arg;
462   aiocb_union *aiocbp;
463   int policy;
464   int fildes;
465
466   pthread_getschedparam (self, &policy, &param);
467
468   do
469     {
470       /* If runp is NULL, then we were created to service the work queue
471          in general, not to handle any particular request. In that case we
472          skip the "do work" stuff on the first pass, and go directly to the
473          "get work off the work queue" part of this loop, which is near the
474          end. */
475       if (runp == NULL)
476         pthread_mutex_lock (&__aio_requests_mutex);
477       else
478         {
479           /* Hopefully this request is marked as running.  */
480           assert (runp->running == allocated);
481
482           /* Update our variables.  */
483           aiocbp = runp->aiocbp;
484           fildes = aiocbp->aiocb.aio_fildes;
485
486           /* Change the priority to the requested value (if necessary).  */
487           if (aiocbp->aiocb.__abs_prio != param.sched_priority
488               || aiocbp->aiocb.__policy != policy)
489             {
490               param.sched_priority = aiocbp->aiocb.__abs_prio;
491               policy = aiocbp->aiocb.__policy;
492               pthread_setschedparam (self, policy, &param);
493             }
494
495           /* Process request pointed to by RUNP.  We must not be disturbed
496              by signals.  */
497           if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
498             {
499               if (aiocbp->aiocb.aio_lio_opcode & 128)
500                 aiocbp->aiocb.__return_value =
501                   TEMP_FAILURE_RETRY (__pread64 (fildes, (void *)
502                                                  aiocbp->aiocb64.aio_buf,
503                                                  aiocbp->aiocb64.aio_nbytes,
504                                                  aiocbp->aiocb64.aio_offset));
505               else
506                 aiocbp->aiocb.__return_value =
507                   TEMP_FAILURE_RETRY (pread (fildes,
508                                              (void *) aiocbp->aiocb.aio_buf,
509                                              aiocbp->aiocb.aio_nbytes,
510                                              aiocbp->aiocb.aio_offset));
511
512               if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
513                 /* The Linux kernel is different from others.  It returns
514                    ESPIPE if using pread on a socket.  Other platforms
515                    simply ignore the offset parameter and behave like
516                    read.  */
517                 aiocbp->aiocb.__return_value =
518                   TEMP_FAILURE_RETRY (read (fildes,
519                                             (void *) aiocbp->aiocb64.aio_buf,
520                                             aiocbp->aiocb64.aio_nbytes));
521             }
522           else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
523             {
524               if (aiocbp->aiocb.aio_lio_opcode & 128)
525                 aiocbp->aiocb.__return_value =
526                   TEMP_FAILURE_RETRY (__pwrite64 (fildes, (const void *)
527                                                   aiocbp->aiocb64.aio_buf,
528                                                   aiocbp->aiocb64.aio_nbytes,
529                                                   aiocbp->aiocb64.aio_offset));
530               else
531                 aiocbp->aiocb.__return_value =
532                   TEMP_FAILURE_RETRY (pwrite (fildes, (const void *)
533                                               aiocbp->aiocb.aio_buf,
534                                               aiocbp->aiocb.aio_nbytes,
535                                               aiocbp->aiocb.aio_offset));
536
537               if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
538                 /* The Linux kernel is different from others.  It returns
539                    ESPIPE if using pwrite on a socket.  Other platforms
540                    simply ignore the offset parameter and behave like
541                    write.  */
542                 aiocbp->aiocb.__return_value =
543                   TEMP_FAILURE_RETRY (write (fildes,
544                                              (void *) aiocbp->aiocb64.aio_buf,
545                                              aiocbp->aiocb64.aio_nbytes));
546             }
547           else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
548             aiocbp->aiocb.__return_value =
549               TEMP_FAILURE_RETRY (fdatasync (fildes));
550           else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
551             aiocbp->aiocb.__return_value =
552               TEMP_FAILURE_RETRY (fsync (fildes));
553           else
554             {
555               /* This is an invalid opcode.  */
556               aiocbp->aiocb.__return_value = -1;
557               __set_errno (EINVAL);
558             }
559
560           /* Get the mutex.  */
561           pthread_mutex_lock (&__aio_requests_mutex);
562
563           /* In theory we would need here a write memory barrier since the
564              callers test using aio_error() whether the request finished
565              and once this value != EINPROGRESS the field __return_value
566              must be committed to memory.
567
568              But since the pthread_mutex_lock call involves write memory
569              barriers as well it is not necessary.  */
570
571           if (aiocbp->aiocb.__return_value == -1)
572             aiocbp->aiocb.__error_code = errno;
573           else
574             aiocbp->aiocb.__error_code = 0;
575
576           /* Send the signal to notify about finished processing of the
577              request.  */
578           __aio_notify (runp);
579
580           /* For debugging purposes we reset the running flag of the
581              finished request.  */
582           assert (runp->running == allocated);
583           runp->running = done;
584
585           /* Now dequeue the current request.  */
586           __aio_remove_request (NULL, runp, 0);
587           if (runp->next_prio != NULL)
588             add_request_to_runlist (runp->next_prio);
589
590           /* Free the old element.  */
591           __aio_free_request (runp);
592         }
593
594       runp = runlist;
595
596       /* If the runlist is empty, then we sleep for a while, waiting for
597          something to arrive in it. */
598       if (runp == NULL && optim.aio_idle_time >= 0)
599         {
600           struct timeval now;
601           struct timespec wakeup_time;
602
603           ++idle_thread_count;
604           gettimeofday (&now, NULL);
605           wakeup_time.tv_sec = now.tv_sec + optim.aio_idle_time;
606           wakeup_time.tv_nsec = now.tv_usec * 1000;
607           if (wakeup_time.tv_nsec > 1000000000)
608             {
609               wakeup_time.tv_nsec -= 1000000000;
610               ++wakeup_time.tv_sec;
611             }
612           pthread_cond_timedwait (&__aio_new_request_notification,
613                                   &__aio_requests_mutex,
614                                   &wakeup_time);
615           --idle_thread_count;
616           runp = runlist;
617         }
618
619       if (runp == NULL)
620         --nthreads;
621       else
622         {
623           assert (runp->running == yes);
624           runp->running = allocated;
625           runlist = runp->next_run;
626
627           /* If we have a request to process, and there's still another in
628              the run list, then we need to either wake up or create a new
629              thread to service the request that is still in the run list. */
630           if (runlist != NULL)
631             {
632               /* There are at least two items in the work queue to work on.
633                  If there are other idle threads, then we should wake them
634                  up for these other work elements; otherwise, we should try
635                  to create a new thread. */
636               if (idle_thread_count > 0)
637                 pthread_cond_signal (&__aio_new_request_notification);
638               else if (nthreads < optim.aio_threads)
639                 {
640                   pthread_t thid;
641                   pthread_attr_t attr;
642
643                   /* Make sure the thread is created detached.  */
644                   pthread_attr_init (&attr);
645                   pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
646
647                   /* Now try to start a thread. If we fail, no big deal,
648                      because we know that there is at least one thread (us)
649                      that is working on AIO operations. */
650                   if (pthread_create (&thid, &attr, handle_fildes_io, NULL)
651                       == 0)
652                     ++nthreads;
653                 }
654             }
655         }
656
657       /* Release the mutex.  */
658       pthread_mutex_unlock (&__aio_requests_mutex);
659     }
660   while (runp != NULL);
661
662   pthread_exit (NULL);
663 }
664
665
666 /* Free allocated resources.  */
667 static void
668 __attribute__ ((unused))
669 free_res (void)
670 {
671   size_t row;
672
673   for (row = 0; row < pool_max_size; ++row)
674     free (pool[row]);
675
676   free (pool);
677 }
678 text_set_element (__libc_subfreeres, free_res);
679
680
681 /* Add newrequest to the runlist. The __abs_prio flag of newrequest must
682    be correctly set to do this. Also, you had better set newrequest's
683    "running" flag to "yes" before you release your lock or you'll throw an
684    assertion. */
685 static void
686 add_request_to_runlist (struct requestlist *newrequest)
687 {
688   int prio = newrequest->aiocbp->aiocb.__abs_prio;
689   struct requestlist *runp;
690
691   if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
692     {
693       newrequest->next_run = runlist;
694       runlist = newrequest;
695     }
696   else
697     {
698       runp = runlist;
699
700       while (runp->next_run != NULL
701              && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
702         runp = runp->next_run;
703
704       newrequest->next_run = runp->next_run;
705       runp->next_run = newrequest;
706     }
707 }