Rewrite to support lio_listio and aio_suspend.
authordrepper <drepper>
Mon, 8 Dec 1997 02:48:36 +0000 (02:48 +0000)
committerdrepper <drepper>
Mon, 8 Dec 1997 02:48:36 +0000 (02:48 +0000)
rt/aio_cancel.c
rt/aio_fsync.c
rt/aio_misc.c
rt/aio_misc.h
rt/aio_read.c
rt/aio_read64.c
rt/aio_suspend.c
rt/aio_write.c
rt/aio_write64.c
rt/lio_listio.c
rt/lio_listio64.c

index f2d9389..e802a85 100644 (file)
@@ -30,7 +30,6 @@
 #undef aio_cancel64
 
 #include <errno.h>
 #undef aio_cancel64
 
 #include <errno.h>
-#include <pthread.h>
 
 #include "aio_misc.h"
 
 
 #include "aio_misc.h"
 
@@ -43,128 +42,106 @@ aio_cancel (fildes, aiocbp)
      int fildes;
      struct aiocb *aiocbp;
 {
      int fildes;
      struct aiocb *aiocbp;
 {
-  struct aiocb *firstp;
+  struct requestlist *req = NULL;
   int result = AIO_ALLDONE;
 
   int result = AIO_ALLDONE;
 
-  /* Request the semaphore.  */
-  sem_wait (&__aio_requests_sema);
+  /* Request the mutex.  */
+  pthread_mutex_lock (&__aio_requests_mutex);
 
 
-  /* Search for the list of requests associated with the given file
-     descriptor.  */
-  for (firstp = (struct aiocb *) __aio_requests; firstp != NULL;
-       firstp = firstp->__next_fd)
-    if (firstp->aio_fildes == fildes)
-      break;
-
-  /* If the file descriptor is not found all work seems to done
-     already.  Otherwise try to cancel the request(s).  */
-  if (firstp != NULL)
+  /* We are asked to cancel a specific AIO request.  */
+  if (aiocbp != NULL)
     {
     {
-      if (aiocbp != NULL)
+      /* If the AIO request is not for this descriptor it has no value
+        to look for the request block.  */
+      if (aiocbp->aio_fildes == fildes)
        {
        {
-         /* Locate the entry corresponding to the AIOCBP parameter.  */
-         if (aiocbp == firstp)
-           /* The requests is currently handled, therefore don't
-              cancel it and signal this to the user.  */
-           result = AIO_NOTCANCELED;
-         else
+         struct requestlist *last = NULL;
+
+         req = __aio_find_req_fd (fildes);
+
+         while (req->aiocbp != (aiocb_union *) aiocbp)
            {
            {
-             while (firstp->__next_prio != NULL
-                    && aiocbp != firstp->__next_prio)
-               firstp = firstp->__next_prio;
-
-             if (firstp->__next_prio != NULL)
-               {
-                 /* The request the user wants to cancel is in the
-                    queue.  Simply remove it.  */
-                 firstp->__next_prio = aiocbp->__next_prio;
-
-                 /* Mark as canceled.  */
-                 aiocbp->__error_code = ECANCELED;
-                 aiocbp->__return_value = -1;
-
-                 /* Send the signal to notify about canceled
-                    processing of the request.  */
-                 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD)
-                   {
-                     /* We have to start a thread.  */
-                     pthread_t tid;
-                     pthread_attr_t attr, *pattr;
-
-                     pattr = (pthread_attr_t *)
-                       aiocbp->aio_sigevent.sigev_notify_attributes;
-                     if (pattr == NULL)
-                       {
-                         pthread_attr_init (&attr);
-                         pthread_attr_setdetachstate (&attr,
-                                                      PTHREAD_CREATE_DETACHED);
-                         pattr = &attr;
-                       }
-
-                     pthread_create (&tid, pattr,
-                                     (void *(*) (void *))
-                                     aiocbp->aio_sigevent.sigev_notify_function,
-                                     aiocbp->aio_sigevent.sigev_value.sival_ptr);
-                   }
-                 else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL)
-                   /* We have to send a signal.  */
-                   __aio_sigqueue (aiocbp->aio_sigevent.sigev_signo,
-                                   aiocbp->aio_sigevent.sigev_value);
-
-                 result = AIO_CANCELED;
-               }
+             last = req;
+             req = req->next_prio;
            }
            }
-       }
-      else
-       {
-         /* First dequeue all waiting requests.  */
-         aiocbp = firstp;
 
 
-         while ((firstp = firstp->__next_prio) != NULL)
+         /* Don't remove the entry if a thread is already working on it.  */
+         if (req->running == allocated)
+           result = AIO_NOTCANCELED;
+         else
            {
            {
-             firstp->__error_code = ECANCELED;
-             firstp->__return_value = -1;
-
-
-             /* Send the signal to notify about canceled processing
-                of the request.  */
-             if (firstp->aio_sigevent.sigev_notify == SIGEV_THREAD)
-               {
-                 /* We have to start a thread.  */
-                 pthread_t tid;
-                 pthread_attr_t attr, *pattr;
-
-                 pattr = (pthread_attr_t *)
-                   aiocbp->aio_sigevent.sigev_notify_attributes;
-                 if (pattr == NULL)
-                   {
-                     pthread_attr_init (&attr);
-                     pthread_attr_setdetachstate (&attr,
-                                                  PTHREAD_CREATE_DETACHED);
-                     pattr = &attr;
-                   }
-
-                 pthread_create (&tid, pattr,
-                                 (void *(*) (void *))
-                                 firstp->aio_sigevent.sigev_notify_function,
-                                 firstp->aio_sigevent.sigev_value.sival_ptr);
-               }
-             else if (firstp->aio_sigevent.sigev_notify == SIGEV_SIGNAL)
-               /* We have to send a signal.  */
-               __aio_sigqueue (firstp->aio_sigevent.sigev_signo,
-                               firstp->aio_sigevent.sigev_value);
+             /* We can remove the entry.  */
+             if (last != NULL)
+               last->next_prio = req->next_prio;
+             else
+               if (req->next_prio == NULL)
+                 {
+                   if (req->last_fd != NULL)
+                     req->last_fd->next_fd = req->next_fd;
+                   if (req->next_fd != NULL)
+                     req->next_fd->last_fd = req->last_fd;
+                 }
+               else
+                 {
+                   if (req->last_fd != NULL)
+                     req->last_fd->next_fd = req->next_prio;
+                   if (req->next_fd != NULL)
+                     req->next_fd->last_fd = req->next_prio;
+                   req->next_prio->last_fd = req->last_fd;
+                   req->next_prio->next_fd = req->next_fd;
+
+                   /* Mark this entry as runnable.  */
+                   req->next_prio->running = yes;
+                 }
+
+             result = AIO_CANCELED;
            }
 
            }
 
-         /* We have to signal that not all requests could be canceled
-            since the first requests is currently processed.  */
-         result = AIO_NOTCANCELED;
-
-         aiocbp->__next_prio = NULL;
+         req->next_prio = NULL;
        }
     }
        }
     }
+  else
+    {
+      /* Find the beginning of the list of all requests for this
+        desriptor.  */
+      req = __aio_find_req_fd (fildes);
+
+      /* If any request is worked on by a thread it must be the first.
+        So either we can delete all requests or all but the first.  */
+      if (req != NULL)
+       if (req->running == allocated)
+         {
+           struct requestlist *old = req;
+           req = req->next_prio;
+           old->next_prio = NULL;
+
+           result = AIO_NOTCANCELED;
+         }
+       else
+         {
+           /* Remove entry from the file descriptor list.  */
+           if (req->last_fd != NULL)
+             req->last_fd->next_fd = req->next_fd;
+           if (req->next_fd != NULL)
+             req->next_fd->last_fd = req->last_fd;
+
+           result = AIO_CANCELED;
+         }
+    }
+
+  /* Mark requests as canceled and send signal.  */
+  while (req != NULL)
+    {
+      struct requestlist *old = req;
+      req->aiocbp->aiocb.__error_code = ECANCELED;
+      req->aiocbp->aiocb.__return_value = -1;
+      __aio_notify (req);
+      req = req->next_prio;
+      __aio_free_request (old);
+    }
 
 
-  /* Release the semaphore.  */
-  sem_post (&__aio_requests_sema);
+  /* Release the mutex.  */
+  pthread_mutex_unlock (&__aio_requests_mutex);
 
   return result;
 }
 
   return result;
 }
index 6daaca1..c813654 100644 (file)
@@ -36,7 +36,7 @@ int
 aio_fsync (int op, struct aiocb *aiocbp)
 {
   return __aio_enqueue_request ((aiocb_union *) aiocbp,
 aio_fsync (int op, struct aiocb *aiocbp)
 {
   return __aio_enqueue_request ((aiocb_union *) aiocbp,
-                               op == O_SYNC ? __LIO_SYNC : __LIO_DSYNC, 1);
+                               op == O_SYNC ? LIO_SYNC : LIO_DSYNC) != NULL;
 }
 
 weak_alias (aio_fsync, aio_fsync64)
 }
 
 weak_alias (aio_fsync, aio_fsync64)
index e4bb12c..6ea30c2 100644 (file)
@@ -21,7 +21,6 @@
 #include <aio.h>
 #include <errno.h>
 #include <pthread.h>
 #include <aio.h>
 #include <errno.h>
 #include <pthread.h>
-#include <semaphore.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <sys/stat.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <sys/stat.h>
 #include "aio_misc.h"
 
 
 #include "aio_misc.h"
 
 
-/* We need a list of pending operations.  This is sorted according to
-   the priority given in the aio_reqprio member.  */
-aiocb_union *__aio_requests;
+/* Pool of request list entries.  */
+static struct requestlist **pool;
 
 
-/* Since the list is global we need a semaphore protecting it.  */
-sem_t __aio_requests_sema;
+/* Number of total and allocated pool entries.  */
+static size_t pool_tab_size;
+static size_t pool_size;
 
 
+/* We implement a two dimensional array but allocate each row separately.
+   The macro below determines how many entries should be used per row.
+   It should better be a power of two.  */
+#define ENTRIES_PER_ROW        16
 
 
-/* The initialization function.  It gets automatically called if any
-   aio_* function is used in the program.  */
-static void
-__attribute__ ((unused))
-aio_initialize (void)
+/* The row table is incremented in units of this.  */
+#define ROW_STEP       8
+
+/* List of available entries.  */
+static struct requestlist *freelist;
+
+/* List of request waiting to be processed.  */
+static struct requestlist *runlist;
+
+/* Structure list of all currently processed requests.  */
+static struct requestlist *requests;
+
+/* Number of threads currently running.  */
+static int nthreads;
+
+
+/* These are the values used to optimize the use of AIO.  The user can
+   overwrite them by using the `aio_init' function.  */
+static struct aioinit optim =
+{
+  20,  /* int aio_threads;     Maximal number of threads.  */
+  256, /* int aio_num;         Number of expected simultanious requests. */
+  0,
+  0,
+  0,
+  0,
+  { 0, }
+};
+
+
+/* Since the list is global we need a mutex protecting it.  */
+pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
+
+
+/* Functions to handle request list pool.  */
+static struct requestlist *
+get_elem (void)
 {
 {
-  /* Initialize the semaphore.  We allow exactly one user at a time.  */
-  sem_init (&__aio_requests_sema, 0, 1);
+  struct requestlist *result;
+
+  if (freelist == NULL)
+    {
+      struct requestlist *new_row;
+      size_t new_size;
+
+      /* Compute new size.  */
+      new_size = pool_size ? pool_size + ENTRIES_PER_ROW : optim.aio_num;
+
+      if ((new_size / ENTRIES_PER_ROW) >= pool_tab_size)
+       {
+         size_t new_tab_size = new_size / ENTRIES_PER_ROW;
+         struct requestlist **new_tab;
+
+         new_tab = (struct requestlist **)
+           realloc (pool, (new_tab_size * sizeof (struct requestlist *)));
+
+         if (new_tab == NULL)
+           return NULL;
+
+         pool_tab_size = new_tab_size;
+         pool = new_tab;
+       }
+
+      if (pool_size == 0)
+       {
+         size_t cnt;
+
+         new_row = (struct requestlist *)
+           calloc (new_size, sizeof (struct requestlist));
+
+         if (new_row == NULL)
+           return NULL;
+
+         for (cnt = 0; cnt < new_size / ENTRIES_PER_ROW; ++cnt)
+           pool[cnt] = &new_row[cnt * ENTRIES_PER_ROW];
+       }
+      else
+       {
+         /* Allocat one new row.  */
+         new_row = (struct requestlist *)
+           calloc (ENTRIES_PER_ROW, sizeof (struct requestlist));
+         if (new_row == NULL)
+           return NULL;
+
+         pool[new_size / ENTRIES_PER_ROW] = new_row;
+       }
+
+      /* Put all the new entries in the freelist.  */
+      do
+       {
+         new_row->next_prio = freelist;
+         freelist = new_row++;
+       }
+      while (++pool_size < new_size);
+    }
+
+  result = freelist;
+  freelist = freelist->next_prio;
+
+  return result;
 }
 
 }
 
-text_set_element (__libc_subinit, aio_initialize);
+
+void
+__aio_free_req (struct requestlist *elem)
+{
+  elem->running = no;
+  elem->next_prio = freelist;
+  freelist = elem;
+}
+
+
+struct requestlist *
+__aio_find_req (aiocb_union *elem)
+{
+  struct requestlist *runp = requests;
+  int fildes = elem->aiocb.aio_fildes;
+
+  while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
+    runp = runp->next_fd;
+
+  if (runp != NULL)
+    if (runp->aiocbp->aiocb.aio_fildes != fildes)
+      runp = NULL;
+    else
+      while (runp != NULL && runp->aiocbp != elem)
+       runp = runp->next_prio;
+
+  return runp;
+}
+
+
+struct requestlist *
+__aio_find_req_fd (int fildes)
+{
+  struct requestlist *runp = requests;
+
+  while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
+    runp = runp->next_fd;
+
+  return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
+         ? runp : NULL);
+}
 
 
 /* The thread handler.  */
 static void *handle_fildes_io (void *arg);
 
 
 
 
 /* The thread handler.  */
 static void *handle_fildes_io (void *arg);
 
 
+/* User optimization.  */
+void
+__aio_init (const struct aioinit *init)
+{
+  /* Get the mutex.  */
+  pthread_mutex_lock (&__aio_requests_mutex);
+
+  /* Only allow writing new values if the table is not yet allocated.  */
+  if (pool == NULL)
+    {
+      optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
+      optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
+                      ? ENTRIES_PER_ROW
+                      : init->aio_num & ~ENTRIES_PER_ROW);
+    }
+
+  /* Release the mutex.  */
+  pthread_mutex_unlock (&__aio_requests_mutex);
+}
+weak_alias (__aio_init, aio_init)
+
+
 /* The main function of the async I/O handling.  It enqueues requests
    and if necessary starts and handles threads.  */
 /* The main function of the async I/O handling.  It enqueues requests
    and if necessary starts and handles threads.  */
-int
-__aio_enqueue_request (aiocb_union *aiocbp, int operation, int require_lock)
+struct requestlist *
+__aio_enqueue_request (aiocb_union *aiocbp, int operation)
 {
 {
-  int result;
+  int result = 0;
   int policy, prio;
   struct sched_param param;
   int policy, prio;
   struct sched_param param;
-  aiocb_union *runp;
+  struct requestlist *last, *runp, *newp;
+  int running = no;
 
   if (aiocbp->aiocb.aio_reqprio < 0
       || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX)
 
   if (aiocbp->aiocb.aio_reqprio < 0
       || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX)
@@ -71,94 +229,160 @@ __aio_enqueue_request (aiocb_union *aiocbp, int operation, int require_lock)
       __set_errno (EINVAL);
       aiocbp->aiocb.__error_code = EINVAL;
       aiocbp->aiocb.__return_value = -1;
       __set_errno (EINVAL);
       aiocbp->aiocb.__error_code = EINVAL;
       aiocbp->aiocb.__return_value = -1;
-      return -1;
-    }
-
-  if (pthread_getschedparam (pthread_self (), &policy, &param) < 0)
-    {
-      /* Something went wrong.  */
-      aiocbp->aiocb.__error_code = errno;
-      aiocbp->aiocb.__return_value = -1;
-      return -1;
+      return NULL;
     }
 
   /* Compute priority for this request.  */
     }
 
   /* Compute priority for this request.  */
+  pthread_getschedparam (pthread_self (), &policy, &param);
   prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
 
   prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
 
+  /* Get the mutex.  */
+  pthread_mutex_lock (&__aio_requests_mutex);
 
 
-  /* Get the semaphore.  */
-  if (require_lock)
-    sem_wait (&__aio_requests_sema);
-
-  runp = __aio_requests;
+  last = NULL;
+  runp = requests;
   /* First look whether the current file descriptor is currently
      worked with.  */
   /* First look whether the current file descriptor is currently
      worked with.  */
-  while (runp != NULL && runp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
-    runp = (aiocb_union *) runp->aiocb.__next_fd;
+  while (runp != NULL
+        && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
+    {
+      last = runp;
+      runp = runp->next_fd;
+    }
 
 
-  if (runp != NULL)
+  /* Get a new element for the waiting list.  */
+  newp = get_elem ();
+  if (newp == NULL)
+    {
+      __set_errno (EAGAIN);
+      pthread_mutex_unlock (&__aio_requests_mutex);
+      return NULL;
+    }
+  newp->aiocbp = aiocbp;
+  newp->waiting = NULL;
+
+  aiocbp->aiocb.__abs_prio = prio;
+  aiocbp->aiocb.__policy = policy;
+  aiocbp->aiocb.aio_lio_opcode = operation;
+  aiocbp->aiocb.__error_code = EINPROGRESS;
+  aiocbp->aiocb.__return_value = 0;
+
+  if (runp != NULL
+      && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
     {
       /* The current file descriptor is worked on.  It makes no sense
     {
       /* The current file descriptor is worked on.  It makes no sense
-        to start another thread since this new thread would have to
-        wait for the previous one to terminate.  Simply enqueue it
-        after the running one according to the priority.  */
-      while (runp->aiocb.__next_prio != NULL
-            && runp->aiocb.__next_prio->__abs_prio >= prio)
-       runp = (aiocb_union *) runp->aiocb.__next_prio;
-
-      aiocbp->aiocb.__next_prio = runp->aiocb.__next_prio;
-      aiocbp->aiocb.__abs_prio = prio;
-      aiocbp->aiocb.__policy = policy;
-      aiocbp->aiocb.aio_lio_opcode = operation;
-      aiocbp->aiocb.__error_code = EINPROGRESS;
-      aiocbp->aiocb.__return_value = 0;
-      runp->aiocb.__next_prio = (struct aiocb *) aiocbp;
-
-      result = 0;
+        to start another thread since this new thread would fight
+        with the running thread for the resources.  But we also cannot
+        say that the thread processing this desriptor shall imeediately
+        after finishing the current job process this request if there
+        are other threads in the running queue which have a higher
+        priority.  */
+
+      /* Simply enqueue it after the running one according to the
+        priority.  */
+      while (runp->next_prio != NULL
+            && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
+       runp = runp->next_prio;
+
+      newp->next_prio = runp->next_prio;
+      runp->next_prio = newp;
+
+      running = queued;
     }
   else
     {
     }
   else
     {
-      /* We create a new thread for this file descriptor.  The
+      /* Enqueue this request for a new descriptor.  */
+      if (last == NULL)
+       {
+         newp->last_fd = NULL;
+         newp->next_fd = requests;
+         if (requests != NULL)
+           requests->last_fd = newp;
+         requests = newp;
+       }
+      else
+       {
+         newp->next_fd = last->next_fd;
+         newp->last_fd = last;
+         last->next_fd = newp;
+         if (newp->next_fd != NULL)
+           newp->next_fd->last_fd = newp;
+       }
+
+      newp->next_prio = NULL;
+    }
+
+  if (running == no)
+    {
+      /* We try to create a new thread for this file descriptor.  The
         function which gets called will handle all available requests
         for this descriptor and when all are processed it will
         function which gets called will handle all available requests
         for this descriptor and when all are processed it will
-        terminate.  */
-      pthread_t thid;
-      pthread_attr_t attr;
-
-      /* First enqueue the request (the list is empty).  */
-      aiocbp->aiocb.__next_fd = NULL;
-      aiocbp->aiocb.__last_fd = NULL;
-
-      aiocbp->aiocb.__next_prio = NULL;
-      aiocbp->aiocb.__abs_prio = prio;
-      aiocbp->aiocb.__policy = policy;
-      aiocbp->aiocb.aio_lio_opcode = operation;
-      aiocbp->aiocb.__error_code = EINPROGRESS;
-      aiocbp->aiocb.__return_value = 0;
-
-      /* Make sure the thread is created detached.  */
-      pthread_attr_init (&attr);
-      pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
-
-      /* Now try to start a thread.  */
-      if (pthread_create (&thid, &attr, handle_fildes_io, aiocbp) < 0)
+        terminate.
+
+        If no new thread can be created or if the specified limit of
+        threads for AIO is reached we queue the request.  */
+
+      /* See if we can create a thread.  */
+      if (nthreads < optim.aio_threads)
        {
        {
-         result = -1;
-         aiocbp->aiocb.__error_code = errno;
-         aiocbp->aiocb.__return_value = -1;
+         pthread_t thid;
+         pthread_attr_t attr;
+
+         /* Make sure the thread is created detached.  */
+         pthread_attr_init (&attr);
+         pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
+
+         /* Now try to start a thread.  */
+         if (pthread_create (&thid, &attr, handle_fildes_io, newp) == 0)
+           {
+             /* We managed to enqueue the request.  All errors which can
+                happen now can be recognized by calls to `aio_return' and
+                `aio_error'.  */
+             running = allocated;
+             ++nthreads;
+           }
+         else if (nthreads == 0)
+           /* We cannot create a thread in the moment and there is
+              also no thread running.  This is a problem.  `errno' is
+              set to EAGAIN if this is only a temporary problem.  */
+           result = -1;
+       }
+    }
+
+  /* Enqueue the request in the run queue if it is not yet running.  */
+  if (running < yes && result == 0)
+    {
+      if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
+       {
+         newp->next_run = runlist;
+         runlist = newp;
        }
       else
        }
       else
-       /* We managed to enqueue the request.  All errors which can
-          happen now can be recognized by calls to `aio_return' and
-          `aio_error'.  */
-         result = 0;
+       {
+         runp = runlist;
+
+         while (runp->next_run != NULL
+                && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
+           runp = runp->next_run;
+
+         newp->next_run = runp->next_run;
+         runp->next_run = newp;
+       }
     }
 
     }
 
-  /* Release the semaphore.  */
-  if (require_lock)
-    sem_post (&__aio_requests_sema);
+  if (result == 0)
+    newp->running = running;
+  else
+    {
+      /* Something went wrong.  */
+      __aio_free_req (newp);
+      newp = NULL;
+    }
 
 
-  return result;
+  /* Release the mutex.  */
+  pthread_mutex_unlock (&__aio_requests_mutex);
+
+  return newp;
 }
 
 
 }
 
 
@@ -167,140 +391,156 @@ handle_fildes_io (void *arg)
 {
   pthread_t self = pthread_self ();
   struct sched_param param;
 {
   pthread_t self = pthread_self ();
   struct sched_param param;
-  aiocb_union *runp = (aiocb_union *) arg;
+  struct requestlist *runp = (struct requestlist *) arg;
+  aiocb_union *aiocbp = runp->aiocbp;
   int policy;
   int policy;
-  int fildes = runp->aiocb.aio_fildes; /* This is always the same.  */
+  int fildes = runp->aiocbp->aiocb.aio_fildes;
 
   pthread_getschedparam (self, &policy, &param);
 
   do
     {
       /* Change the priority to the requested value (if necessary).  */
 
   pthread_getschedparam (self, &policy, &param);
 
   do
     {
       /* Change the priority to the requested value (if necessary).  */
-      if (runp->aiocb.__abs_prio != param.sched_priority
-         || runp->aiocb.__policy != policy)
+      if (aiocbp->aiocb.__abs_prio != param.sched_priority
+         || aiocbp->aiocb.__policy != policy)
        {
        {
-         param.sched_priority = runp->aiocb.__abs_prio;
-         policy = runp->aiocb.__policy;
+         param.sched_priority = aiocbp->aiocb.__abs_prio;
+         policy = aiocbp->aiocb.__policy;
          pthread_setschedparam (self, policy, &param);
        }
 
       /* Process request pointed to by RUNP.  We must not be disturbed
         by signals.  */
          pthread_setschedparam (self, policy, &param);
        }
 
       /* Process request pointed to by RUNP.  We must not be disturbed
         by signals.  */
-      if ((runp->aiocb.aio_lio_opcode & 127) == LIO_READ)
+      if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
        {
        {
-         if (runp->aiocb.aio_lio_opcode & 128)
-           runp->aiocb.__return_value =
+         if (aiocbp->aiocb.aio_lio_opcode & 128)
+           aiocbp->aiocb.__return_value =
              TEMP_FAILURE_RETRY (__pread64 (fildes,
              TEMP_FAILURE_RETRY (__pread64 (fildes,
-                                            (void *) runp->aiocb64.aio_buf,
-                                            runp->aiocb64.aio_nbytes,
-                                            runp->aiocb64.aio_offset));
+                                            (void *) aiocbp->aiocb64.aio_buf,
+                                            aiocbp->aiocb64.aio_nbytes,
+                                            aiocbp->aiocb64.aio_offset));
          else
          else
-           runp->aiocb.__return_value =
+           aiocbp->aiocb.__return_value =
              TEMP_FAILURE_RETRY (__pread (fildes,
              TEMP_FAILURE_RETRY (__pread (fildes,
-                                          (void *) runp->aiocb.aio_buf,
-                                          runp->aiocb.aio_nbytes,
-                                          runp->aiocb.aio_offset));
+                                          (void *) aiocbp->aiocb.aio_buf,
+                                          aiocbp->aiocb.aio_nbytes,
+                                          aiocbp->aiocb.aio_offset));
        }
        }
-      else if ((runp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
+      else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
        {
        {
-         if (runp->aiocb.aio_lio_opcode & 128)
-           runp->aiocb.__return_value =
+         if (aiocbp->aiocb.aio_lio_opcode & 128)
+           aiocbp->aiocb.__return_value =
              TEMP_FAILURE_RETRY (__pwrite64 (fildes,
              TEMP_FAILURE_RETRY (__pwrite64 (fildes,
-                                             (const void *) runp->aiocb64.aio_buf,
-                                             runp->aiocb64.aio_nbytes,
-                                             runp->aiocb64.aio_offset));
+                                             (const void *) aiocbp->aiocb64.aio_buf,
+                                             aiocbp->aiocb64.aio_nbytes,
+                                             aiocbp->aiocb64.aio_offset));
          else
          else
-           runp->aiocb.__return_value =
+           aiocbp->aiocb.__return_value =
              TEMP_FAILURE_RETRY (__pwrite (fildes,
              TEMP_FAILURE_RETRY (__pwrite (fildes,
-                                           (const void *) runp->aiocb.aio_buf,
-                                           runp->aiocb.aio_nbytes,
-                                           runp->aiocb.aio_offset));
+                                           (const void *) aiocbp->aiocb.aio_buf,
+                                           aiocbp->aiocb.aio_nbytes,
+                                           aiocbp->aiocb.aio_offset));
        }
        }
-      else if (runp->aiocb.aio_lio_opcode == __LIO_DSYNC)
-       runp->aiocb.__return_value = TEMP_FAILURE_RETRY (fdatasync (fildes));
-      else if (runp->aiocb.aio_lio_opcode == __LIO_SYNC)
-       runp->aiocb.__return_value = TEMP_FAILURE_RETRY (fsync (fildes));
+      else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
+       aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fdatasync (fildes));
+      else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
+       aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fsync (fildes));
       else
        {
          /* This is an invalid opcode.  */
       else
        {
          /* This is an invalid opcode.  */
-         runp->aiocb.__return_value = -1;
+         aiocbp->aiocb.__return_value = -1;
          __set_errno (EINVAL);
        }
 
          __set_errno (EINVAL);
        }
 
-      if (runp->aiocb.__return_value == -1)
-       runp->aiocb.__error_code = errno;
+      /* Get the mutex.  */
+      pthread_mutex_lock (&__aio_requests_mutex);
+
+      if (aiocbp->aiocb.__return_value == -1)
+       aiocbp->aiocb.__error_code = errno;
       else
       else
-       runp->aiocb.__error_code = 0;
+       aiocbp->aiocb.__error_code = 0;
 
       /* Send the signal to notify about finished processing of the
         request.  */
 
       /* Send the signal to notify about finished processing of the
         request.  */
-      if (runp->aiocb.aio_sigevent.sigev_notify == SIGEV_THREAD)
+      __aio_notify (runp);
+
+      /* Now dequeue the current request.  */
+      if (runp->next_prio == NULL)
        {
        {
-         /* We have to start a thread.  */
-         pthread_t tid;
-         pthread_attr_t attr, *pattr;
+         /* No outstanding request for this descriptor.  Process the
+            runlist if necessary.  */
+         if (runp->next_fd != NULL)
+           runp->next_fd->last_fd = runp->last_fd;
+         if (runp->last_fd != NULL)
+           runp->last_fd->next_fd = runp->next_fd;
+       }
+      else
+       {
+         runp->next_prio->last_fd = runp->last_fd;
+         runp->next_prio->next_fd = runp->next_fd;
+         runp->next_prio->running = yes;
+         if (runp->next_fd != NULL)
+           runp->next_fd->last_fd = runp->next_prio;
+         if (runp->last_fd != NULL)
+           runp->last_fd->next_fd = runp->next_prio;
+       }
+
+      /* Free the old element.  */
+      __aio_free_req (runp);
 
 
-         pattr = (pthread_attr_t *)
-           runp->aiocb.aio_sigevent.sigev_notify_attributes;
-         if (pattr == NULL)
+      runp = freelist;
+      if (runp != NULL)
+       {
+         /* We must not run requests which are not marked `running'.  */
+         if (runp->running == yes)
            {
            {
-             pthread_attr_init (&attr);
-             pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
-             pattr = &attr;
+             freelist = runp->next_run;
+             runp->running = allocated;
            }
            }
-
-         if (pthread_create (&tid,
-                             (pthread_attr_t *)
-                             runp->aiocb.aio_sigevent.sigev_notify_attributes,
-                             (void *(*) (void *))
-                             runp->aiocb.aio_sigevent.sigev_notify_function,
-                             runp->aiocb.aio_sigevent.sigev_value.sival_ptr)
-             < 0)
+         else
            {
            {
-             /* XXX What shall we do if already an error is set by
-                read/write/fsync?  */
-             runp->aiocb.__error_code = errno;
-             runp->aiocb.__return_value = -1;
+             struct requestlist *old;
+
+             do
+               {
+                 old = runp;
+                 runp = runp->next_run;
+               }
+             while (runp != NULL && runp->running != yes);
+
+             if (runp != NULL)
+               old->next_run = runp->next_run;
            }
        }
            }
        }
-      else if (runp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL)
-       /* We have to send a signal.  */
-       if (__aio_sigqueue (runp->aiocb.aio_sigevent.sigev_signo,
-                           runp->aiocb.aio_sigevent.sigev_value) < 0)
-         {
-           /* XXX What shall we do if already an error is set by
-              read/write/fsync?  */
-           runp->aiocb.__error_code = errno;
-           runp->aiocb.__return_value = -1;
-         }
-
-      /* Get the semaphore.  */
-      sem_wait (&__aio_requests_sema);
 
 
-      /* Now dequeue the current request.  */
-      if (runp->aiocb.__next_prio == NULL)
-       {
-         if (runp->aiocb.__next_fd != NULL)
-           runp->aiocb.__next_fd->__last_fd = runp->aiocb.__last_fd;
-         if (runp->aiocb.__last_fd != NULL)
-           runp->aiocb.__last_fd->__next_fd = runp->aiocb.__next_fd;
-         runp = NULL;
-       }
-      else
-       {
-         runp->aiocb.__next_prio->__last_fd = runp->aiocb.__last_fd;
-         runp->aiocb.__next_prio->__next_fd = runp->aiocb.__next_fd;
-         if (runp->aiocb.__next_fd != NULL)
-           runp->aiocb.__next_fd->__last_fd = runp->aiocb.__next_prio;
-         if (runp->aiocb.__last_fd != NULL)
-           runp->aiocb.__last_fd->__next_fd = runp->aiocb.__next_prio;
-         runp = (aiocb_union *) runp->aiocb.__next_prio;
-       }
+      /* If no request to work on we will stop the thread.  */
+      if (runp == NULL)
+       --nthreads;
 
 
-      /* Release the semaphore.  */
-      sem_post (&__aio_requests_sema);
+      /* Release the mutex.  */
+      pthread_mutex_unlock (&__aio_requests_mutex);
     }
   while (runp != NULL);
 
   pthread_exit (NULL);
 }
     }
   while (runp != NULL);
 
   pthread_exit (NULL);
 }
+
+
+/* Free allocated resources.  */
+static void
+__attribute__ ((unused))
+free_res (void)
+{
+  size_t row;
+
+  /* The first block of rows as specified in OPTIM is allocated in
+     one chunk.  */
+  free (pool[0]);
+
+  for (row = optim.aio_num / ENTRIES_PER_ROW; row < pool_tab_size; ++row)
+    free (pool[row]);
+
+  free (pool);
+}
+
+text_set_element (__libc_subfreeres, free_res);
index c2eb9fc..af8e247 100644 (file)
 #ifndef _AIO_MISC_H
 #define _AIO_MISC_H    1
 
 #ifndef _AIO_MISC_H
 #define _AIO_MISC_H    1
 
-#include <semaphore.h>
+#include <aio.h>
+#include <pthread.h>
+
+
+/* Extend the operation enum.  */
+enum
+{
+  LIO_DSYNC = LIO_READ + 1,
+  LIO_SYNC,
+  LIO_READ64 = LIO_READ | 128,
+  LIO_WRITE64 = LIO_WRITE | 128
+};
+
 
 /* Union of the two request types.  */
 typedef union
 
 /* Union of the two request types.  */
 typedef union
@@ -28,18 +40,73 @@ typedef union
     struct aiocb64 aiocb64;
   } aiocb_union;
 
     struct aiocb64 aiocb64;
   } aiocb_union;
 
-/* List of enqueued requests.  */
-extern aiocb_union *__aio_requests;
+
+/* Used to synchronize.  */
+struct waitlist
+  {
+    struct waitlist *next;
+
+    pthread_cond_t *cond;
+    volatile int *counterp;
+    /* The next field is used in asynchronous `lio_listio' operations.  */
+    struct sigevent *sigevp;
+  };
+
+
+/* Status of a request.  */
+enum
+{
+  no,
+  queued,
+  yes,
+  allocated
+};
+
+
+/* Used to queue requests..  */
+struct requestlist
+  {
+    int running;
+
+    struct requestlist *last_fd;
+    struct requestlist *next_fd;
+    struct requestlist *next_prio;
+    struct requestlist *next_run;
+
+    /* Pointer to the actual data.  */
+    aiocb_union *aiocbp;
+
+    /* List of waiting processes.  */
+    struct waitlist *waiting;
+  };
+
 
 /* Lock for global I/O list of requests.  */
 
 /* Lock for global I/O list of requests.  */
-extern sem_t __aio_requests_sema;
+extern pthread_mutex_t __aio_requests_mutex;
 
 
 /* Enqueue request.  */
 
 
 /* Enqueue request.  */
-extern int __aio_enqueue_request (aiocb_union *aiocbp, int operation,
-                                 int require_lock);
+extern struct requestlist *__aio_enqueue_request (aiocb_union *aiocbp,
+                                                 int operation)
+     internal_function;
+
+/* Find request entry for given AIO control block.  */
+extern struct requestlist *__aio_find_req (aiocb_union *elem)
+     internal_function;
+
+/* Find request entry for given file descriptor.  */
+extern struct requestlist *__aio_find_req_fd (int fildes) internal_function;
+
+/* Release the entry for the request.  */
+extern void __aio_free_request (struct requestlist *req) internal_function;
+
+/* Notify initiator of request and tell this everybody listening.  */
+extern void __aio_notify (struct requestlist *req) internal_function;
+
+/* Notify initiator of request.  */
+extern int __aio_notify_only (struct sigevent *sigev) internal_function;
 
 /* Send the signal.  */
 
 /* Send the signal.  */
-extern int __aio_sigqueue (int sig, const union sigval val);
+extern int __aio_sigqueue (int sig, const union sigval val) internal_function;
 
 #endif /* aio_misc.h */
 
 #endif /* aio_misc.h */
index 8286ba9..871ce99 100644 (file)
@@ -27,5 +27,5 @@ int
 aio_read (aiocbp)
      struct aiocb *aiocbp;
 {
 aio_read (aiocbp)
      struct aiocb *aiocbp;
 {
-  return __aio_enqueue_request ((aiocb_union *) aiocbp, LIO_READ, 1);
+  return __aio_enqueue_request ((aiocb_union *) aiocbp, LIO_READ) != NULL;
 }
 }
index bf808fb..b194dda 100644 (file)
@@ -27,5 +27,5 @@ int
 aio_read64 (aiocbp)
      struct aiocb64 *aiocbp;
 {
 aio_read64 (aiocbp)
      struct aiocb64 *aiocbp;
 {
-  return __aio_enqueue_request ((aiocb_union *) aiocbp, __LIO_READ64, 1);
+  return __aio_enqueue_request ((aiocb_union *) aiocbp, LIO_READ64) != NULL;
 }
 }
index 75bf9ba..6123b7b 100644 (file)
@@ -40,16 +40,89 @@ aio_suspend (list, nent, timeout)
      int nent;
      const struct timespec *timeout;
 {
      int nent;
      const struct timespec *timeout;
 {
+  pthread_cond_t cond;
+  struct waitlist waitlist[nent];
+  struct requestlist *requestlist[nent];
   int cnt;
   int cnt;
+  int result = 0;
+
+  /* Request the mutex.  */
+  pthread_mutex_lock (&__aio_requests_mutex);
 
   /* First look whether there is already a terminated request.  */
   for (cnt = 0; cnt < nent; ++cnt)
     if (list[cnt] != NULL && list[cnt]->__error_code != EINPROGRESS)
 
   /* First look whether there is already a terminated request.  */
   for (cnt = 0; cnt < nent; ++cnt)
     if (list[cnt] != NULL && list[cnt]->__error_code != EINPROGRESS)
-      return 0;
+      break;
+
+  if (cnt == nent)
+    {
+      int oldstate;
+
+      /* There is not yet a finished request.  Signal the request that
+        we are working for it.  */
+      for (cnt = 0; cnt < nent; ++cnt)
+       if (list[cnt] != NULL && list[cnt]->__error_code == EINPROGRESS)
+         {
+           requestlist[cnt] = __aio_find_req ((aiocb_union *) list[cnt]);
+
+           if (requestlist[cnt] != NULL)
+             {
+               waitlist[cnt].cond = &cond;
+               waitlist[cnt].next = requestlist[cnt]->waiting;
+               waitlist[cnt].counterp = NULL;
+               waitlist[cnt].sigevp = NULL;
+               requestlist[cnt]->waiting = &waitlist[cnt];
+             }
+         }
+
+      /* Since `pthread_cond_wait'/`pthread_cond_timedwait' are cancelation
+        points we must be careful.  We added entries to the waiting lists
+        which we must remove.  So defer cancelation for now.  */
+      pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, &oldstate);
+
+      if (timeout == NULL)
+       result = pthread_cond_wait (&cond, &__aio_requests_mutex);
+      else
+       result = pthread_cond_timedwait (&cond, &__aio_requests_mutex,
+                                        timeout);
+
+      /* Now remove the entry in the waiting list for all requests
+        which didn't terminate  */
+      for (cnt = 0; cnt < nent; ++cnt)
+       if (list[cnt] != NULL && list[cnt]->__error_code == EINPROGRESS
+           && requestlist[cnt] != NULL)
+         {
+           struct waitlist **listp = &requestlist[cnt]->waiting;
+
+           /* There is the chance that we cannot find our entry anymore.
+              This could happen if the request terminated and restarted
+              again.  */
+           while (*listp != NULL && *listp != &waitlist[cnt])
+             listp = &(*listp)->next;
+
+           if (*listp != NULL)
+             *listp = (*listp)->next;
+         }
+
+      /* Now it's time to restore the cancelation state.  */
+      pthread_setcancelstate (oldstate, NULL);
+
+      if (result != 0)
+       {
+         /* An error occurred.  Possibly it's EINTR.  We have to translate
+            the timeout error report of `pthread_cond_timedwait' to the
+            form expected from `aio_suspend'.  */
+         if (result == ETIMEDOUT)
+           __set_errno (EAGAIN);
+
+         result = -1;
+       }
+    }
 
 
-  /* XXX We have to write code which waits.  */
+  /* Release the mutex.  */
+  pthread_mutex_unlock (&__aio_requests_mutex);
 
 
-  return -1;
+  return result;
 }
 
 weak_alias (aio_suspend, aio_suspend64)
 }
 
 weak_alias (aio_suspend, aio_suspend64)
index 80b827c..8801d89 100644 (file)
@@ -27,5 +27,5 @@ int
 aio_write (aiocbp)
      struct aiocb *aiocbp;
 {
 aio_write (aiocbp)
      struct aiocb *aiocbp;
 {
-  return __aio_enqueue_request ((aiocb_union *) aiocbp, LIO_WRITE, 1);
+  return __aio_enqueue_request ((aiocb_union *) aiocbp, LIO_WRITE) != NULL;
 }
 }
index 477ce2c..4d599b1 100644 (file)
@@ -27,5 +27,5 @@ int
 aio_write64 (aiocbp)
      struct aiocb64 *aiocbp;
 {
 aio_write64 (aiocbp)
      struct aiocb64 *aiocbp;
 {
-  return __aio_enqueue_request ((aiocb_union *) aiocbp, __LIO_WRITE64, 1);
+  return __aio_enqueue_request ((aiocb_union *) aiocbp, LIO_WRITE64) != NULL;
 }
 }
index 73df5c2..b389c6a 100644 (file)
 
 #include <aio.h>
 #include <errno.h>
 
 #include <aio.h>
 #include <errno.h>
-#include <semaphore.h>
+#include <stdlib.h>
 
 #include "aio_misc.h"
 
 
 
 #include "aio_misc.h"
 
 
+/* We need this special structure to handle asynchronous I/O.  */
+struct async_waitlist
+  {
+    int counter;
+    struct sigevent sigev;
+    struct waitlist list[0];
+  };
+
+
 int
 lio_listio (mode, list, nent, sig)
      int mode;
 int
 lio_listio (mode, list, nent, sig)
      int mode;
@@ -32,8 +41,9 @@ lio_listio (mode, list, nent, sig)
      int nent;
      struct sigevent *sig;
 {
      int nent;
      struct sigevent *sig;
 {
+  struct requestlist *requests[nent];
   int cnt;
   int cnt;
-  int total = 0;
+  volatile int total = 0;
   int result = 0;
 
   /* Check arguments.  */
   int result = 0;
 
   /* Check arguments.  */
@@ -43,26 +53,100 @@ lio_listio (mode, list, nent, sig)
       return -1;
     }
 
       return -1;
     }
 
-  /* Request the semaphore.  */
-  sem_wait (&__aio_requests_sema);
+  /* Request the mutex.  */
+  pthread_mutex_lock (&__aio_requests_mutex);
 
   /* Now we can enqueue all requests.  Since we already acquired the
 
   /* Now we can enqueue all requests.  Since we already acquired the
-     semaphore the enqueue function need not do this.  */
+     mutex the enqueue function need not do this.  */
   for (cnt = 0; cnt < nent; ++cnt)
     if (list[cnt] != NULL && list[cnt]->aio_lio_opcode != LIO_NOP)
   for (cnt = 0; cnt < nent; ++cnt)
     if (list[cnt] != NULL && list[cnt]->aio_lio_opcode != LIO_NOP)
-      if (__aio_enqueue_request ((aiocb_union *) list[cnt],
-                                list[cnt]->aio_lio_opcode, 0) >= 0)
-       /* Successfully enqueued.  */
-       ++total;
+      {
+       requests[cnt] =  __aio_enqueue_request ((aiocb_union *) list[cnt],
+                                               list[cnt]->aio_lio_opcode);
+
+       if (requests[cnt] != NULL)
+         /* Successfully enqueued.  */
+         ++total;
+       else
+         /* Signal that we've seen an error.  `errno' and the error code
+            of the aiocb will tell more.  */
+         result = -1;
+      }
+
+  if (total == 0)
+    {
+      /* We don't have anything to do except signalling if we work
+        asynchronously.  */
+      if (mode == LIO_NOWAIT)
+       __aio_notify_only (sig);
+    }
+  else if (mode == LIO_WAIT)
+    {
+      pthread_cond_t cond;
+      struct waitlist waitlist[nent];
+      int oldstate;
+
+      total = 0;
+      for (cnt = 0; cnt < nent; ++cnt)
+       if (list[cnt] != NULL && list[cnt]->aio_lio_opcode != LIO_NOP
+           && requests[cnt] != NULL)
+         {
+           waitlist[cnt].cond = &cond;
+           waitlist[cnt].next = requests[cnt]->waiting;
+           waitlist[cnt].counterp = NULL;
+           waitlist[cnt].sigevp = NULL;
+           requests[cnt]->waiting = &waitlist[cnt];
+           ++total;
+         }
+
+      /* Since `pthread_cond_wait'/`pthread_cond_timedwait' are cancelation
+        points we must be careful.  We added entries to the waiting lists
+        which we must remove.  So defer cancelation for now.  */
+      pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, &oldstate);
+
+      while (total > 0)
+       if (pthread_cond_wait (&cond, &__aio_requests_mutex) == 0)
+         --total;
+
+      /* Now it's time to restore the cancelation state.  */
+      pthread_setcancelstate (oldstate, NULL);
+    }
+  else
+    {
+      struct async_waitlist *waitlist;
+
+      waitlist = (struct async_waitlist *)
+       malloc (sizeof (struct async_waitlist)
+               + (nent * sizeof (struct waitlist)));
+
+      if (waitlist == NULL)
+       {
+         __set_errno (EAGAIN);
+         result = -1;
+       }
       else
       else
-       /* Signal that we've seen an error.  `errno' and the error code
-          of the aiocb will tell more.  */
-       result = -1;
+       {
+         total = 0;
 
 
+         for (cnt = 0; cnt < nent; ++cnt)
+           if (list[cnt] != NULL && list[cnt]->aio_lio_opcode != LIO_NOP
+               && requests[cnt] != NULL)
+             {
+               waitlist->list[cnt].cond = NULL;
+               waitlist->list[cnt].next = requests[cnt]->waiting;
+               waitlist->list[cnt].counterp = &waitlist->counter;
+               waitlist->list[cnt].sigevp = &waitlist->sigev;
+               requests[cnt]->waiting = &waitlist->list[cnt];
+               ++total;
+             }
 
 
+         waitlist->counter = total;
+         waitlist->sigev = *sig;
+       }
+    }
 
 
-  /* Release the semaphore.  */
-  sem_post (&__aio_requests_sema);
+  /* Release the mutex.  */
+  pthread_mutex_unlock (&__aio_requests_mutex);
 
   return result;
 }
 
   return result;
 }
index b17e9ed..19f56ad 100644 (file)
 
 #include <aio.h>
 #include <errno.h>
 
 #include <aio.h>
 #include <errno.h>
-#include <semaphore.h>
+#include <stdlib.h>
 
 #include "aio_misc.h"
 
 
 
 #include "aio_misc.h"
 
 
+/* We need this special structure to handle asynchronous I/O.  */
+struct async_waitlist
+  {
+    int counter;
+    struct sigevent sigev;
+    struct waitlist list[0];
+  };
+
+
 int
 lio_listio64 (mode, list, nent, sig)
      int mode;
 int
 lio_listio64 (mode, list, nent, sig)
      int mode;
@@ -32,8 +41,9 @@ lio_listio64 (mode, list, nent, sig)
      int nent;
      struct sigevent *sig;
 {
      int nent;
      struct sigevent *sig;
 {
+  struct requestlist *requests[nent];
   int cnt;
   int cnt;
-  int total = 0;
+  volatile int total = 0;
   int result = 0;
 
   /* Check arguments.  */
   int result = 0;
 
   /* Check arguments.  */
@@ -43,24 +53,99 @@ lio_listio64 (mode, list, nent, sig)
       return -1;
     }
 
       return -1;
     }
 
-  /* Request the semaphore.  */
-  sem_wait (&__aio_requests_sema);
+  /* Request the mutex.  */
+  pthread_mutex_lock (&__aio_requests_mutex);
 
   /* Now we can enqueue all requests.  Since we already acquired the
 
   /* Now we can enqueue all requests.  Since we already acquired the
-     semaphore the enqueue function need not do this.  */
+     mutex the enqueue function need not do this.  */
   for (cnt = 0; cnt < nent; ++cnt)
     if (list[cnt] != NULL && list[cnt]->aio_lio_opcode != LIO_NOP)
   for (cnt = 0; cnt < nent; ++cnt)
     if (list[cnt] != NULL && list[cnt]->aio_lio_opcode != LIO_NOP)
-      if (__aio_enqueue_request ((aiocb_union *) list[cnt],
-                                list[cnt]->aio_lio_opcode | 128, 0) >= 0)
-       /* Successfully enqueued.  */
-       ++total;
+      {
+       requests[cnt] = __aio_enqueue_request ((aiocb_union *) list[cnt],
+                                              (list[cnt]->aio_lio_opcode
+                                               | 128));
+       if (requests[cnt] != NULL)
+         /* Successfully enqueued.  */
+         ++total;
+       else
+         /* Signal that we've seen an error.  `errno' and the error code
+            of the aiocb will tell more.  */
+         result = -1;
+      }
+
+  if (total == 0)
+    {
+      /* We don't have anything to do except signalling if we work
+        asynchronously.  */
+      if (mode == LIO_NOWAIT)
+       __aio_notify_only (sig);
+    }
+  else if (mode == LIO_WAIT)
+    {
+      pthread_cond_t cond;
+      struct waitlist waitlist[nent];
+      int oldstate;
+
+      total = 0;
+      for (cnt = 0; cnt < nent; ++cnt)
+       if (list[cnt] != NULL && list[cnt]->aio_lio_opcode != LIO_NOP
+           && requests[cnt] != NULL)
+         {
+           waitlist[cnt].cond = &cond;
+           waitlist[cnt].next = requests[cnt]->waiting;
+           waitlist[cnt].counterp = &total;
+           waitlist[cnt].sigevp = NULL;
+           requests[cnt]->waiting = &waitlist[cnt];
+           ++total;
+         }
+
+      /* Since `pthread_cond_wait'/`pthread_cond_timedwait' are cancelation
+        points we must be careful.  We added entries to the waiting lists
+        which we must remove.  So defer cancelation for now.  */
+      pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, &oldstate);
+
+      while (total > 0)
+       pthread_cond_wait (&cond, &__aio_requests_mutex);
+
+      /* Now it's time to restore the cancelation state.  */
+      pthread_setcancelstate (oldstate, NULL);
+    }
+  else
+    {
+      struct async_waitlist *waitlist;
+
+      waitlist = (struct async_waitlist *)
+       malloc (sizeof (struct async_waitlist)
+               + (nent * sizeof (struct waitlist)));
+
+      if (waitlist == NULL)
+       {
+         __set_errno (EAGAIN);
+         result = -1;
+       }
       else
       else
-       /* Signal that we've seen an error.  `errno' and the error code
-          of the aiocb will tell more.  */
-       result = -1;
+       {
+         total = 0;
+
+         for (cnt = 0; cnt < nent; ++cnt)
+           if (list[cnt] != NULL && list[cnt]->aio_lio_opcode != LIO_NOP
+               && requests[cnt] != NULL)
+             {
+               waitlist->list[cnt].cond = NULL;
+               waitlist->list[cnt].next = requests[cnt]->waiting;
+               waitlist->list[cnt].counterp = &waitlist->counter;
+               waitlist->list[cnt].sigevp = &waitlist->sigev;
+               requests[cnt]->waiting = &waitlist->list[cnt];
+               ++total;
+             }
+
+         waitlist->counter = total;
+         waitlist->sigev = *sig;
+       }
+    }
 
 
-  /* Release the semaphore.  */
-  sem_post (&__aio_requests_sema);
+  /* Release the mutex.  */
+  pthread_mutex_unlock (&__aio_requests_mutex);
 
   return result;
 }
 
   return result;
 }