Prex Home / Browse Source - Prex Version: 0.9.0

root/sys/ipc/msg.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. msg_send
  2. msg_receive
  3. msg_reply
  4. msg_cancel
  5. msg_abort
  6. msg_dequeue
  7. msg_enqueue
  8. msg_init

   1 /*-
   2  * Copyright (c) 2005-2007, Kohsuke Ohtani
   3  * All rights reserved.
   4  *
   5  * Redistribution and use in source and binary forms, with or without
   6  * modification, are permitted provided that the following conditions
   7  * are met:
   8  * 1. Redistributions of source code must retain the above copyright
   9  *    notice, this list of conditions and the following disclaimer.
  10  * 2. Redistributions in binary form must reproduce the above copyright
  11  *    notice, this list of conditions and the following disclaimer in the
  12  *    documentation and/or other materials provided with the distribution.
  13  * 3. Neither the name of the author nor the names of any co-contributors
  14  *    may be used to endorse or promote products derived from this software
  15  *    without specific prior written permission.
  16  *
  17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
  18  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  20  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
  21  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  26  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  27  * SUCH DAMAGE.
  28  */
  29 
  30 /*
  31  * msg.c - routines to transmit a message.
  32  */
  33 
  34 /**
  35  * IPC transmission:
  36  *
  37  * Messages are sent to the specific object by using msg_send.  The
  38  * transmission of a message is completely synchronous with this
  39  * kernel. This means the thread which sent a message is blocked until
  40  * it receives a response from another thread.  msg_receive performs
  41  * reception of a message. msg_receive is also blocked when no message
  42  * is reached to the target object.  The receiver thread must answer the
  43  * message using msg_reply after processing the message.
  44  *
  45  * The receiver thread can not receive an additional message until it
  46  * replies to the sender. In short, a thread can receive only one
  47  * message at once. In other hand, once the thread receives a message,
  48  * it can send another message to different object. This mechanism
  49  * allows threads to redirect the sender's request to another thread.
  50  *
  51  * A message is copied from thread to thread directly without any kernel
  52  * buffering. The message buffer in sender's memory space is automatically
  53  * mapped to the receiver's memory by kernel. Since there is no page
  54  * out of memory in this system, we can copy the message data via physical
  55  * memory at anytime.
  56  */
  57 
  58 #include <kernel.h>
  59 #include <sched.h>
  60 #include <task.h>
  61 #include <kmem.h>
  62 #include <thread.h>
  63 #include <task.h>
  64 #include <event.h>
  65 #include <ipc.h>
  66 
  67 /* forward declarations */
  68 static thread_t msg_dequeue(queue_t);
  69 static void     msg_enqueue(queue_t, thread_t);
  70 
  71 static struct event ipc_event;          /* event for IPC operation */
  72 
  73 /*
  74  * Send a message.
  75  *
  76  * The current thread will be blocked until any other thread
  77  * receives and reply the message.  A thread can send a
  78  * message to any object if it knows the object id.
  79  */
  80 int
  81 msg_send(object_t obj, void *msg, size_t size)
  82 {
  83         struct msg_header *hdr;
  84         thread_t t;
  85         void *kmsg;
  86         int rc;
  87 
  88         if (!user_area(msg))
  89                 return EFAULT;
  90 
  91         if (size < sizeof(struct msg_header))
  92                 return EINVAL;
  93 
  94         sched_lock();
  95 
  96         if (!object_valid(obj)) {
  97                 sched_unlock();
  98                 return EINVAL;
  99         }
 100         /*
 101          * A thread can not send a message when it is
 102          * already receiving from the target object.
 103          * It will obviously cause a deadlock.
 104          */
 105         if (obj == curthread->recvobj) {
 106                 sched_unlock();
 107                 return EDEADLK;
 108         }
 109         /*
 110          * Translate message address to the kernel linear
 111          * address.  So that a receiver thread can access
 112          * the message via kernel pointer. We can catch
 113          * the page fault here.
 114          */
 115         if ((kmsg = kmem_map(msg, size)) == NULL) {
 116                 sched_unlock();
 117                 return EFAULT;
 118         }
 119         curthread->msgaddr = kmsg;
 120         curthread->msgsize = size;
 121 
 122         /*
 123          * The sender ID is filled in the message header
 124          * by the kernel. So, the receiver can trust it.
 125          */
 126         hdr = (struct msg_header *)kmsg;
 127         hdr->task = curtask;
 128 
 129         /*
 130          * If receiver already exists, wake it up.
 131          * The highest priority thread can get the message.
 132          */
 133         if (!queue_empty(&obj->recvq)) {
 134                 t = msg_dequeue(&obj->recvq);
 135                 sched_unsleep(t, 0);
 136         }
 137         /*
 138          * Sleep until we get a reply message.
 139          * Note: Do not touch any data in the object
 140          * structure after we wakeup. This is because the
 141          * target object may be deleted while we are sleeping.
 142          */
 143         curthread->sendobj = obj;
 144         msg_enqueue(&obj->sendq, curthread);
 145         rc = sched_sleep(&ipc_event);
 146         if (rc == SLP_INTR)
 147                 queue_remove(&curthread->ipc_link);
 148         curthread->sendobj = NULL;
 149 
 150         sched_unlock();
 151 
 152         /*
 153          * Check sleep result.
 154          */
 155         switch (rc) {
 156         case SLP_BREAK:
 157                 return EAGAIN;  /* Receiver has been terminated */
 158         case SLP_INVAL:
 159                 return EINVAL;  /* Object has been deleted */
 160         case SLP_INTR:
 161                 return EINTR;   /* Exception */
 162         default:
 163                 /* DO NOTHING */
 164                 break;
 165         }
 166         return 0;
 167 }
 168 
 169 /*
 170  * Receive a message.
 171  *
 172  * A thread can receive a message from the object which was
 173  * created by any thread belongs to same task. If the message
 174  * has not reached yet, it blocks until any message comes in.
 175  *
 176  * The size argument specifies the "maximum" size of the message
 177  * buffer to receive. If the sent message is larger than this
 178  * size, the kernel will automatically clip the message to this
 179  * maximum buffer size.
 180  *
 181  * When a message is received, the sender thread is removed from
 182  * object's send queue. So, another thread can receive the
 183  * subsequent message from that object. This is important for
 184  * the multi-thread server which must receive multiple messages
 185  * simultaneously.
 186  */
 187 int
 188 msg_receive(object_t obj, void *msg, size_t size)
 189 {
 190         thread_t t;
 191         size_t len;
 192         int rc, error = 0;
 193 
 194         if (!user_area(msg))
 195                 return EFAULT;
 196 
 197         sched_lock();
 198 
 199         if (!object_valid(obj)) {
 200                 sched_unlock();
 201                 return EINVAL;
 202         }
 203         if (obj->owner != curtask) {
 204                 sched_unlock();
 205                 return EACCES;
 206         }
 207         /*
 208          * Check if this thread finished previous receive
 209          * operation.  A thread can not receive different
 210          * messages at once.
 211          */
 212         if (curthread->recvobj) {
 213                 sched_unlock();
 214                 return EBUSY;
 215         }
 216         curthread->recvobj = obj;
 217 
 218         /*
 219          * If no message exists, wait until message arrives.
 220          */
 221         while (queue_empty(&obj->sendq)) {
 222                 /*
 223                  * Block until someone sends a message.
 224                  */
 225                 msg_enqueue(&obj->recvq, curthread);
 226                 rc = sched_sleep(&ipc_event);
 227                 if (rc != 0) {
 228                         /*
 229                          * Receive is failed due to some reasons.
 230                          */
 231                         switch (rc) {
 232                         case SLP_INVAL:
 233                                 error = EINVAL; /* Object has been deleted */
 234                                 break;
 235                         case SLP_INTR:
 236                                 queue_remove(&curthread->ipc_link);
 237                                 error = EINTR;  /* Got exception */
 238                                 break;
 239                         default:
 240                                 panic("msg_receive");
 241                                 break;
 242                         }
 243                         curthread->recvobj = NULL;
 244                         sched_unlock();
 245                         return error;
 246                 }
 247 
 248                 /*
 249                  * Check the existence of the sender thread again.
 250                  * Even if this thread is woken by the sender thread,
 251                  * the message may be received by another thread.
 252                  * This may happen when another high priority thread
 253                  * becomes runnable before we receive the message.
 254                  */
 255         }
 256 
 257         t = msg_dequeue(&obj->sendq);
 258 
 259         /*
 260          * Copy out the message to the user-space.
 261          */
 262         len = MIN(size, t->msgsize);
 263         if (len > 0) {
 264                 if (copyout(t->msgaddr, msg, len)) {
 265                         msg_enqueue(&obj->sendq, t);
 266                         curthread->recvobj = NULL;
 267                         sched_unlock();
 268                         return EFAULT;
 269                 }
 270         }
 271         /*
 272          * Detach the message from the target object.
 273          */
 274         curthread->sender = t;
 275         t->receiver = curthread;
 276 
 277         sched_unlock();
 278         return error;
 279 }
 280 
 281 /*
 282  * Send a reply message.
 283  *
 284  * The target object must be the object that we are receiving.
 285  * Otherwise, this function will be failed.
 286  */
 287 int
 288 msg_reply(object_t obj, void *msg, size_t size)
 289 {
 290         thread_t t;
 291         size_t len;
 292 
 293         if (!user_area(msg))
 294                 return EFAULT;
 295 
 296         sched_lock();
 297 
 298         if (!object_valid(obj) || obj != curthread->recvobj) {
 299                 sched_unlock();
 300                 return EINVAL;
 301         }
 302         /*
 303          * Check if sender still exists
 304          */
 305         if (curthread->sender == NULL) {
 306                 /* Clear receive state */
 307                 curthread->recvobj = NULL;
 308                 sched_unlock();
 309                 return EINVAL;
 310         }
 311         /*
 312          * Copy a message to the sender's buffer.
 313          */
 314         t = curthread->sender;
 315         len = MIN(size, t->msgsize);
 316         if (len > 0) {
 317                 if (copyin(msg, t->msgaddr, len)) {
 318                         sched_unlock();
 319                         return EFAULT;
 320                 }
 321         }
 322         /*
 323          * Wakeup sender with no error.
 324          */
 325         sched_unsleep(t, 0);
 326         t->receiver = NULL;
 327 
 328         /* Clear transmit state */
 329         curthread->sender = NULL;
 330         curthread->recvobj = NULL;
 331 
 332         sched_unlock();
 333         return 0;
 334 }
 335 
 336 /*
 337  * Cancel pending message operation of the specified thread.
 338  * This is called when the thread is terminated.
 339  *
 340  * We have to handle the following conditions to prevent deadlock.
 341  *
 342  * If the terminated thread is sending a message:
 343  *  1. A message is already received.
 344  *     -> The receiver thread will reply to the invalid thread.
 345  *
 346  *  2. A message is not received yet.
 347  *     -> The thread remains in send queue of the object.
 348  *
 349  * When the terminated thread is receiving a message.
 350  *  3. A message is already sent.
 351  *     -> The sender thread will wait for reply forever.
 352  *
 353  *  4. A message is not sent yet.
 354  *     -> The thread remains in receive queue of the object.
 355  */
 356 void
 357 msg_cancel(thread_t t)
 358 {
 359 
 360         sched_lock();
 361 
 362         if (t->sendobj != NULL) {
 363                 if (t->receiver != NULL)
 364                         t->receiver->sender = NULL;
 365                 else
 366                         queue_remove(&t->ipc_link);
 367         }
 368         if (t->recvobj != NULL) {
 369                 if (t->sender != NULL) {
 370                         sched_unsleep(t->sender, SLP_BREAK);
 371                         t->sender->receiver = NULL;
 372                 } else
 373                         queue_remove(&t->ipc_link);
 374         }
 375         sched_unlock();
 376 }
 377 
 378 /*
 379  * Abort all message operations relevant to the specified object.
 380  * This is called when the target object is deleted.
 381  */
 382 void
 383 msg_abort(object_t obj)
 384 {
 385         queue_t q;
 386         thread_t t;
 387 
 388         sched_lock();
 389 
 390         /*
 391          * Force wakeup all threads in the send queue.
 392          */
 393         while (!queue_empty(&obj->sendq)) {
 394                 q = dequeue(&obj->sendq);
 395                 t = queue_entry(q, struct thread, ipc_link);
 396                 sched_unsleep(t, SLP_INVAL);
 397         }
 398         /*
 399          * Force wakeup all threads waiting for receive.
 400          */
 401         while (!queue_empty(&obj->recvq)) {
 402                 q = dequeue(&obj->recvq);
 403                 t = queue_entry(q, struct thread, ipc_link);
 404                 sched_unsleep(t, SLP_INVAL);
 405         }
 406         sched_unlock();
 407 }
 408 
 409 /*
 410  * Dequeue thread from the IPC queue.
 411  * The most highest priority thread will be chosen.
 412  */
 413 static thread_t
 414 msg_dequeue(queue_t head)
 415 {
 416         queue_t q;
 417         thread_t t, top;
 418 
 419         q = queue_first(head);
 420         top = queue_entry(q, struct thread, ipc_link);
 421 
 422         while (!queue_end(head, q)) {
 423                 t = queue_entry(q, struct thread, ipc_link);
 424                 if (t->priority < top->priority)
 425                         top = t;
 426                 q = queue_next(q);
 427         }
 428         queue_remove(&top->ipc_link);
 429         return top;
 430 }
 431 
 432 static void
 433 msg_enqueue(queue_t head, thread_t t)
 434 {
 435 
 436         enqueue(head, &t->ipc_link);
 437 }
 438 
 439 void
 440 msg_init(void)
 441 {
 442 
 443         event_init(&ipc_event, "ipc");
 444 }

/* [<][>][^][v][top][bottom][index][help] */