Data Structure

struct reactor_t {
  int epoll_fd;
  int event_fd;
  std::mutex* list_mutex;
  list_t* invalidation_list;  // reactor objects that have been unregistered.
  pthread_t run_thread;       // the pthread on which reactor_run is executing.
  bool is_running;            // indicates whether |run_thread| is valid.
  bool object_removed;
};

struct reactor_object_t {
  int fd;              // the file descriptor to monitor for events.
  void* context;       // a context that's passed back to the *_ready functions.
  reactor_t* reactor;  // the reactor instance this object is registered with.
  std::mutex* mutex;  // protects the lifetime of this object and all variables.

  void (*read_ready)(void* context);   // function to call when the file
                                       // descriptor becomes readable.
  void (*write_ready)(void* context);  // function to call when the file
                                       // descriptor becomes writeable.
};

Initial

reactor_t* reactor_new(void) {
  reactor_t* ret = (reactor_t*)osi_calloc(sizeof(reactor_t));

  ret->epoll_fd = INVALID_FD;
  ret->event_fd = INVALID_FD;

  ret->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
  if (ret->epoll_fd == INVALID_FD) {
    LOG_ERROR("%s unable to create epoll instance: %s", __func__,
              strerror(errno));
    goto error;
  }

  ret->event_fd = eventfd(0, 0);
  if (ret->event_fd == INVALID_FD) {
    LOG_ERROR("%s unable to create eventfd: %s", __func__, strerror(errno));
    goto error;
  }

  ret->list_mutex = new std::mutex;
  ret->invalidation_list = list_new(NULL);
  if (!ret->invalidation_list) {
    LOG_ERROR("%s unable to allocate object invalidation list.", __func__);
    goto error;
  }

  struct epoll_event event;
  memset(&event, 0, sizeof(event));
  event.events = EPOLLIN;
  event.data.ptr = NULL;
  if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) {
    LOG_ERROR("%s unable to register eventfd with epoll set: %s", __func__,
              strerror(errno));
    goto error;
  }

  return ret;

error:;
  reactor_free(ret);
  return NULL;
}

Run

static reactor_status_t run_reactor(reactor_t* reactor, int iterations) {
  CHECK(reactor != NULL);

  reactor->run_thread = pthread_self();
  reactor->is_running = true;

  struct epoll_event events[MAX_EVENTS];
  for (int i = 0; iterations == 0 || i < iterations; ++i) {
    {
      std::lock_guard<std::mutex> lock(*reactor->list_mutex);
      list_clear(reactor->invalidation_list);
    }

    int ret;
    OSI_NO_INTR(ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1));
    if (ret == -1) {
      LOG_ERROR("%s error in epoll_wait: %s", __func__, strerror(errno));
      reactor->is_running = false;
      return REACTOR_STATUS_ERROR;
    }

    for (int j = 0; j < ret; ++j) {
      // The event file descriptor is the only one that registers with
      // a NULL data pointer. We use the NULL to identify it and break
      // out of the reactor loop.
      if (events[j].data.ptr == NULL) {
        eventfd_t value;
        eventfd_read(reactor->event_fd, &value);
        reactor->is_running = false;
        return REACTOR_STATUS_STOP;
      }

      reactor_object_t* object = (reactor_object_t*)events[j].data.ptr;

      std::unique_lock<std::mutex> lock(*reactor->list_mutex);
      if (list_contains(reactor->invalidation_list, object)) {
        continue;
      }

      // Downgrade the list lock to an object lock.
      {
        std::lock_guard<std::mutex> obj_lock(*object->mutex);
        lock.unlock();

        reactor->object_removed = false;
        if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) &&
            object->read_ready)
          object->read_ready(object->context);
        if (!reactor->object_removed && events[j].events & EPOLLOUT &&
            object->write_ready)
          object->write_ready(object->context);
      }

      if (reactor->object_removed) {
        delete object->mutex;
        osi_free(object);
      }
    }
  }

  reactor->is_running = false;
  return REACTOR_STATUS_DONE;
}

reactor_status_t reactor_start(reactor_t* reactor) {
  CHECK(reactor != NULL);
  return run_reactor(reactor, 0);
}

reactor_status_t reactor_run_once(reactor_t* reactor) {
  CHECK(reactor != NULL);
  return run_reactor(reactor, 1);
}

void reactor_stop(reactor_t* reactor) {
  CHECK(reactor != NULL);

  eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP);
}

Register

reactor_object_t* reactor_register(reactor_t* reactor, int fd, void* context,
                                   void (*read_ready)(void* context),
                                   void (*write_ready)(void* context)) {
  CHECK(reactor != NULL);
  CHECK(fd != INVALID_FD);

  reactor_object_t* object =
      (reactor_object_t*)osi_calloc(sizeof(reactor_object_t));

  object->reactor = reactor;
  object->fd = fd;
  object->context = context;
  object->read_ready = read_ready;
  object->write_ready = write_ready;
  object->mutex = new std::mutex;

  struct epoll_event event;
  memset(&event, 0, sizeof(event));
  if (read_ready) event.events |= (EPOLLIN | EPOLLRDHUP);
  if (write_ready) event.events |= EPOLLOUT;
  event.data.ptr = object;

  if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
    LOG_ERROR("%s unable to register fd %d to epoll set: %s", __func__, fd,
              strerror(errno));
    delete object->mutex;
    osi_free(object);
    return NULL;
  }

  return object;
}

bool reactor_change_registration(reactor_object_t* object,
                                 void (*read_ready)(void* context),
                                 void (*write_ready)(void* context)) {
  CHECK(object != NULL);

  struct epoll_event event;
  memset(&event, 0, sizeof(event));
  if (read_ready) event.events |= (EPOLLIN | EPOLLRDHUP);
  if (write_ready) event.events |= EPOLLOUT;
  event.data.ptr = object;

  if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) ==
      -1) {
    LOG_ERROR("%s unable to modify interest set for fd %d: %s", __func__,
              object->fd, strerror(errno));
    return false;
  }

  std::lock_guard<std::mutex> lock(*object->mutex);
  object->read_ready = read_ready;
  object->write_ready = write_ready;

  return true;
}

DeRegister

void reactor_unregister(reactor_object_t* obj) {
  CHECK(obj != NULL);

  reactor_t* reactor = obj->reactor;

  if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1)
    LOG_ERROR("%s unable to unregister fd %d from epoll set: %s", __func__,
              obj->fd, strerror(errno));

  if (reactor->is_running &&
      pthread_equal(pthread_self(), reactor->run_thread)) {
    reactor->object_removed = true;
    return;
  }

  {
    std::unique_lock<std::mutex> lock(*reactor->list_mutex);
    list_append(reactor->invalidation_list, obj);
  }

  // Taking the object lock here makes sure a callback for |obj| isn't
  // currently executing. The reactor thread must then either be before
  // the callbacks or after. If after, we know that the object won't be
  // referenced because it has been taken out of the epoll set. If before,
  // it won't be referenced because the reactor thread will check the
  // invalidation_list and find it in there. So by taking this lock, we
  // are waiting until the reactor thread drops all references to |obj|.
  // One the wait completes, we can unlock and destroy |obj| safely.
  obj->mutex->lock();
  obj->mutex->unlock();
  delete obj->mutex;
  osi_free(obj);
}

DeInitial

void reactor_free(reactor_t* reactor) {
  if (!reactor) return;

  list_free(reactor->invalidation_list);
  close(reactor->event_fd);
  close(reactor->epoll_fd);
  osi_free(reactor);
}

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部