c++ Multithreading programming 学习(1)

Thread Overview


Thread 是一段独立于其他代码的,由操作系统调度的指令。它能使用process的资源,但是可以独立的被OS调用。 它像是轻量级的process,但是创建比process快。 同时thread可以直接与同一process下其他thread通信。 而process之间用到PIPE,FIFO来发送短小高频的消息,适用于两个process之间。或者用共享内存以及socket来通信。

Pthread

Overview

在c++11以前,c++没有很好的thread库支持,一般是通过系统的thread库来实现线程相关代码。Pthread即源于posix系统。代码基于c,对于其他操作系统不具有移植性。

使用pthread

对于posix系统,需要包含头文件#include <pthread.h> ,如果需要使用semaphre, 则需要#include <semaphore.h>。 在编译时,需要g++ test.cpp -lpthread 或者在cmake中定义find_package(Threads REQUIRED)

Thread Creation

API

int pthread_create(pthread_t *thread,
              const pthread_attr_t *attr,
              void *(*start_routine)(void*), void *arg);
void pthread_exit(void *value_ptr);
int pthread_cancel(pthread_t thread);
int pthread_attr_init(pthread_attr_t *attr);
int pthread_attr_destroy(pthread_attr_t *attr);

pthread_create 创建一个新的线程并运行它.它能在代码的任何处被多次调用.
pthread_create 的参数:
thread:返回新 thread 的唯一标识.
attr:设置 thread 的性质.NULL 为默认性质.
start_routine: 新 thread 运行的函数指针.
arg:传给 start_routine 的参数,必须强制转换成 void *.NULL 为没有参数传入.

Thread Attributes

pthread_attr_initpthread_attr_destroy 被用来初始化/销毁 thread 性质对象. 如 detached or joinable state, scheduling policy.

Thread Binding

参见这篇文章cpu 亲和性

Thread Termination

Thread 有多种终止方式:

  • 线程从它的运行中正常放回.它的工作完成.
  • 线程调用 pthread_exit 无论它的工作完成否.
  • 线程被另外一个线程调用pthread_cance来取消.
  • 整个进程都终止,如果任何线程调用了 exec()exit().
  • main() 函数先完成,没有调用 pthread_exit.

void pthread_exit(void * rval_ptr);
函数说明:rval_ptr参数是线程结束时的返回值,可由其他函数如pthread_join()来获取。这个调用不关闭文件,在线程打开的任何文件在线程终止后将继续打开.

  • 如果 main()在它创建的 threads 之前终止,并没有显式的调用 pthread_exit(),所有创建的线程都将终止,因为main()结束,不再存在支持这些线程.
  • 通过main()在最后调用 pthread_exit(), main()将阻塞并保持存活来支持它创建的线程运行直到它们完成.

int pthread_cancel(pthread_t thread);
函数说明:取消线程,该函数在其他线程中调用,用来强行杀死指定的线程。
我从来没用过这个函数,似乎使用情况有些tricky,参见这篇博客。另外我自己写了个程序试验了一下但是一直有segment fault TODO: 查明原因,深入理解一下。

Example of pthread creationand termination

如果注释掉 main()中最后的 pthread_exit(NULL); ,那么它创建的线程将会完成不了所有的打印而被强制退出.

#include <pthread.h>
#include <cstdio>
#include <cstdlib>

void *ThreadProc(void *param) {
  int id;
  id = *(static_cast<int *>(param));
  for (int i = 0; i < 10; ++i) {
    if(id == 1 && i == 8)
    { 
        // for pthread_t 1, it will only print to 7 and then terminate.
        pthread_exit(NULL);
    }
    printf("thread %d: run %d \n", id, i);
  }
  pthread_exit(NULL);
}

int main(int argc, char *argv[]) {
  const int kNumThreads = 4;
  pthread_t threads[kNumThreads];
  int thread_ids[kNumThreads];
  for (int i = 0; i < kNumThreads; ++i) {
    thread_ids[i] = i;
    int rt = pthread_create(&threads[i], NULL, ThreadProc,
                            static_cast<void *>(&thread_ids[i]));
    if (rt) {
      printf("ERROR: pthread_create failed, rt=%d\n", rt);
      exit(1);
    }
  }
  pthread_exit(NULL);
}

Threads joining and detaching

API

int pthread_join(pthread_t thread, void **value_ptr);
int pthread_detach(pthread_t thread);
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);
int pthread_attr_getdetachstate(const pthread_attr_t *attr, int *detachstate);
//PTHREAD_CREATE_DETACHED 分离
//PTHREAD_CREATE_JOINABLE 不分离

Joining


joining 是用来同步不同线程的方法之一

  • int pthread_join(pthread_t thread, void **value_ptr); 将阻塞调用它的线程直到被指定的thread线程终止。
  • 调用的线程能获取目标线程终止返回的 status 如果目标线程调用 pthread_exit()
  • 当一个线程被创建,它的属性之一是它是否可以 join.只有创建的能被 join 的线程才能被 join.如果线程线程以 detached 创建,它永远都不能被 join。

Detaching

  • pthread_detach() 可以将一个线程detach,即使它原先是以join位attribute建立的

Example

#include <pthread.h>
#include <cstdio>
#include <cstdlib>

void *ThreadProc(void *param) {
  int id;
  id = *(static_cast<int *>(param));
  for (int i = 0; i < 10; ++i) {
    printf("thread %d: run %d \n", id, i);
  }
  pthread_exit(param);
}

int main(int argc, char *argv[]) {
  const int kNumThreads = 4;
  pthread_t threads[kNumThreads];
  int thread_ids[kNumThreads];
  pthread_attr_t attr;

  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

  for (int i = 0; i < kNumThreads; ++i) {
    thread_ids[i] = i;
    int rt = pthread_create(&threads[i], &attr, ThreadProc,
                            static_cast<void *>(&thread_ids[i]));
    if (rt) {
      printf("ERROR: pthread_create failed, rt=%d\n", rt);
      exit(1);
    }
  }
  for (int i = 0; i < kNumThreads; ++i) {
    void *status;
    int rt = pthread_join(threads[i], &status);
    if (rt) {
      printf("ERROR: pthread_join failed, rt=%d\n", rt);
      exit(1);
    }
    printf("completed join with thread %d having a status of %d\n"
           , i, *static_cast<int *>(status));
  }
  pthread_exit(NULL);
}

Stack Management

API

int pthread_attr_getstacksize(const pthread_attr_t *restrict attr,
              size_t *restrict stacksize);
int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize);
int pthread_attr_getstackaddr(const pthread_attr_t *restrict attr,
              void **restrict stackaddr); // removed in POSIX.1-2008
int pthread_attr_setstackaddr(pthread_attr_t *attr, void *stackaddr); 
int pthread_attr_setstack(pthread_attr_t *attr, void *stackaddr, size_t stacksize);
int pthread_attr_getstack(const pthread_attr_t *attr,  void **stackaddr, size_t *stacksize);
int pthread_attr_setguardsize(pthread_attr_t *attr, size_t guardsize);
//功能:设置线程属性中栈尾的警戒区大小
int pthread_attr_getguardsize(pthread_attr_t *attr, size_t *guardsize);
//功能:获取线程属性中栈尾的警戒区大小

每个线程都有各自独立的 stack, pthread_attr_getstackaddrpthread_attr_setstackaddr 分别获取和设置线程的栈底地址. 在POSIX.1-2008被删除,需要使用新的api pthread_attr_setstack, pthread_attr_getstack.get/setstacksize()获取和设置stack的栈空间字节数。我们通过attr在pthread_create()传入

example

#include <pthread.h>
#include <cstdio>
#include <cstdlib>

pthread_attr_t attr;

void *ThreadProc(void *param) {
  int id;
  size_t thread_stack_size;
  id = *(static_cast<int *>(param));
  pthread_attr_getstacksize(&attr, &thread_stack_size);
  printf("thread %d: stack size = %d\n", id, thread_stack_size);
  for (int i = 0; i < 10; ++i) {
    printf("thread %d: run %d \n", id, i);
  }
  pthread_exit(NULL);
}

int main(int argc, char *argv[]) {
  const int kNumThreads = 4;
  const int kThround = 1000;
  pthread_t threads[kNumThreads];
  int thread_ids[kNumThreads];
  size_t stack_size;

  pthread_attr_init(&attr);
  pthread_attr_getstacksize(&attr, &stack_size);
  printf("Default stack size = %d\n", stack_size);
  stack_size = sizeof(double) * kThround * kThround;
  printf("Setting stack size = %d\n", stack_size);
  pthread_attr_setstacksize(&attr, stack_size);
  for (int i = 0; i < kNumThreads; ++i) {
    thread_ids[i] = i;
    int rt = pthread_create(&threads[i], &attr, ThreadProc,
                            static_cast<void *>(&thread_ids[i]));
    if (rt) {
      printf("ERROR: pthread_create failed, rt=%d\n", rt);
      exit(1);
    }
  }
  pthread_exit(NULL);
  pthread_attr_destroy(&attr);
  return 0;
}

Thread Attribute(上文中未提到的)

API

typedef union
{
char __size[__SIZEOF_PTHREAD_ATTR_T];
long int __align;
}pthread_attr_t;

int pthread_attr_setscope(pthread_attr_t *attr, int scope);
//功能:设置线程属性中线程的竞争范围
//PTHREAD_SCOPE_SYSTEM(绑定)
//PTHREAD_SCOPE_PROCESS(非绑定)
int pthread_attr_getscope(pthread_attr_t *attr, int *scope);
//功能:获取线程属性中线程的竞争范围

关于绑定属性,涉及到另外一个概念:轻进程(Light Weight Process,LWP)。轻进程可以理解为内核进程,它位于用户层和内核层之间。系统对线程资源的分配和对线程的控制时通过轻进程来实现的,一个轻进程可以控制一个或多个线程。默认情况下,启动多少轻进程、哪些轻进程来控制哪些线程是由系统来控制的,这种状况即称为非绑定。绑定状况下,则顾名思义,即某个线程固定地绑在一个轻进程之上。被绑定的线程具有较高的响应速度,这是因为CPU时间片的调度是面向轻进程的,绑定的线程可以保证在需要的时候它总有一个轻进程可用。通过设置被绑定的轻进程的优先级和调度级可以使得绑定的线程满足诸如实时反应之类的要求。

int pthread_attr_setinheritsched(pthread_attr_t *attr, int inheritsched);
//功能:设置线程属性中线程的调度策略来源
//PTHREAD_INHERIT_SCHED 继承创建者
//PTHREAD_EXPLICIT_SCHED 单独设置
int pthread_attr_getinheritsched(pthread_attr_t *attr, int *inheritsched);

int pthread_attr_setschedpolicy(pthread_attr_t *attr, int policy);
//功能:设置线程属性中线程的调度策略
//SCHED_FIFO 先进先出策略
//SCHED_RR 轮转策略
//SCHED_OTHER 缺省
int pthread_attr_getschedpolicy(pthread_attr_t *attr, int *policy);
//功能:获取线程属性中线程的调度策略

struct sched_param {
int sched_priority;
};
int pthread_attr_setschedparam(pthread_attr_t *attr, const struct sched_param *param);
//功能:设置线程属性中线程的调度参数(优先级别)
//param:最高级别0
int pthread_attr_getschedparam(pthread_attr_t *attr, struct sched_param *param);

使用方法:

  1. 定义线程属性结构体 pthread_attr_t attr;
  2. 初始化线程属性结构体 pthread_attr_init(&attr);
  3. 使用pthread_attr_set系列函数对结构体变量进行设置。
  4. 在创建线程时(pthread_create函数的第二个参数)中使用线程属性结构变量创建线程。
pthread_t pthread_self(void);
int pthread_equal(pthread_t t1, pthread_t t2);
int pthread_once(pthread_once_t *once_control,
              void (*init_routine)(void));
pthread_once_t once_control = PTHREAD_ONCE_INIT;
  • pthread_self 返回调用线程的唯一 thread ID.
  • pthread_equal 比较两个线程 ID 是否相等.
  • pthread_once 本函数使用初值为PTHREAD_ONCE_INIT的once_control变量保证init_routine()函数在本进程执行序列中仅执行一次。在多线程编程环境下,尽管pthread_once()调用会出现在多个线程中,init_routine()函数仅执行一次,究竟在哪个线程中执行是不定的,是由内核调度来决定。Linux Threads使用互斥锁和条件变量保证由pthread_once()指定的函数执行且仅执行一次,而once_control表示是否执行过。如果once_control的初值不是PTHREAD_ONCE_INIT(Linux Threads定义为0),pthread_once() 的行为就会不正常。在Linux中,实际"一次性函数"的执行状态有三种:NEVER(0)、IN_PROGRESS(1)、DONE (2),如果once初值设为1,则由于所有pthread_once()都必须等待其中一个激发"已执行一次"信号,因此所有pthread_once ()都会陷入永久的等待中;如果设为2,则表示该函数已执行过一次,从而所有pthread_once()都会立即返回0