模块概述

本模块主要进行线程及各类线程同步机制的封装,提供线程类(thread.h)和线程同步类(mutex.h)。

线程类:

  • Thread:构造函数传入线程入口函数和线程名称。线程类构造之后线程即开始运行,构造函数在确保线程真正开始运行之后返回。

线程同步类:

  • Semaphore: 计数信号量,基于sem_t实现
  • Mutex: 互斥锁,基于pthread_mutex_t实现
  • RWMutex: 读写锁,基于pthread_rwlock_t实现
  • Spinlock: 自旋锁,基于pthread_spinlock_t实现

几个注意点

Q:为什么不直接使用C++提供的std::thread,而是选择封装pthread_t?

A:C++的std::thread也是基于pthread实现的,但它的线程同步机制不完善,没有提供互斥量,RWMutex,Spinlock等应对高并发场景的线程同步机制。所以选择自己封装


Q:线程入口函数只支持void(void)吗?

A:实际使用时可以结合std::bind来绑定参数,这样就相当于支持任何类型和数量的参数


Q:有了协程,为什么还需要线程?

A:只有协程无法利用多Cpu,需要使用线程弥补协程的一些缺点,进行互补

线程类Thread

thread.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Thread: public Noncopyable{
public:
using ptr = std::shared_ptr<Thread>;

Thread(std::function<void()> cb, const std::string& name);

~Thread();

pid_t getId() const { return m_id; }

std::string getName() const { return m_name; }

// 线程的join操作:阻塞等待线程执行完成
void join();

// 获取当前线程的指针
static Thread* GetThis();

// 获取当前线程的名称
static std::string GetName();

// 设置当前线程的名称
static void SetName(const std::string& name);

// 初始化线程属性并执行线程的任务函数
static void* run(void* arg);

private:
// 全局的线程id(内核态id),主要用于日志等操作
pid_t m_id = -1;
// pthread返回的线程id标识(用户态id)
pthread_t m_thread = 0;
// 线程需要执行的任务函数
std::function<void()> m_cb;
// 线程名称
std::string m_name;
// 信号量
Semaphore m_semaphore;
};

Noncopyable

线程对象理论上是不能拷贝的,所以需要禁用它的拷贝构造与拷贝赋值。通过继承一个Noncopyable类来实现这一操作

1
2
3
4
5
6
7
8
9
10
class Noncopyable{
public:
Noncopyable() = default;

~Noncopyable() = default;

Noncopyable(const Noncopyable&) = delete;

Noncopyable& operator=(const Noncopyable&) = delete;
};

构造函数

调用pthread_create创建子线程,传入Thread::run()作为回调函数,传入Thread实例自身(this)作为run()的参数。实际的初始化操作在run()中完成(run()运行在子线程中)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Thread::Thread(std::function<void()> cb, const std::string& name)
: m_cb(cb)
, m_name(name){
if(name.empty()){
m_name = "UNKNOW";
}
// pthread_create:创建线程,创建成功返回0
// 通过传递 this 指针,可以在新线程的上下文中访问调用 Thread 的当前实例对象,
// 从而允许线程函数 run() 操作该对象的成员变量和成员函数
int rt = pthread_create(&m_thread, nullptr, &Thread::run, this);
if(rt){
SYLAR_LOG_ERROR(g_logger) << "pthread_create fail, errnum=" << rt
<< " name=" << name;
throw std::logic_error("pthread_create error");
}
// 阻塞信号量,等待run()中线程初始化完成
m_semaphore.wait();
}

信号量

线程类需要保证在构造完成之后线程函数一定已经处于运行状态,这里通过一个信号量来实现的,构造函数在创建线程后会一直阻塞,直到线程函数运行并且通知信号量,构造函数才会返回,而构造函数一旦返回,就说明线程函数已经在执行了。细节如下:

  • 构造函数中对信号量上锁,run()中完成线程初始化后再对信号量解锁
  • 这样操作保证了只有Thread初始化完成后主线程才能调用该线程实例
  • 虽然wait操作和notify操作是分别在主线程和子线程中执行的,但他们操作的是同一个Thread实例的m_semaphore成员变量

run

run()子线程的入口函数,执行实际的线程初始化操作,初始化线程属性并运行真正的子线程任务函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void* Thread::run(void* arg){
// pthread_create中,run接收一个this,即Thread*参数
Thread* thread = (Thread*)arg;

// 初始化当前线程的属性并绑定到线程局部存储
t_thread = thread;
t_thread_name = thread->m_name;
thread->m_id = sylar::GetThreadId();

// pthread_setname_np:设置线程名称,只取m_name的前15个字符(Linux 限制线程名称最长为 16 字符,包括末尾'\0')
pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());

// 将线程的任务回调函数移动到局部变量 cb,避免线程对象中的 m_cb 被多线程并发访问
std::function<void()> cb;
cb.swap(thread->m_cb);

// 线程初始化完成后对信号量解锁
thread->m_semaphore.notify();

// 执行任务函数
cb();

return 0;
}

几个注意点:

  • run的参数:主线程的Thread构造函数中,pthread_create给run指定了一个参数this,即主线程中创建的Thread对象自身,这一操作使得run可以在子线程中操作主线程中创建的Thread对象

  • GetThreadId():通过系统调用返回一个系统范围内唯一的线程id,可以用于日志等操作

    1
    2
    3
    pid_t GetThreadId(){
    return syscall(SYS_gettid);
    }
  • cb.swap:这一步实际执行了两步操作:

    1. 获取回调函数
    2. 清空m_cb,避免它被多线程并发访问

线程局部量

thread_local static变量:

  • static:生命周期为静态的(直到程序结束才销毁)
  • thread_local:线程局部存储变量,每个线程有独立的实例

声明两个线程局部量:

  • t_thread:为每个线程提供一个指向当前 Thread 对象的指针,便于在全局范围内访问与当前线程关联的 Thread 实例,而无需显式传递线程实例
  • t_thread_name:本线程的名字
1
2
static thread_local Thread* t_thread = nullptr;
static thread_local std::string t_thread_name = "UNKNOW";

与其对应的两个接口,通过接口使得用户无须访问线程对象就能获取到当前线程的属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Thread* Thread::GetThis(){
return t_thread;
}
std::string Thread::GetName(){
return t_thread_name;
}

void Thread::SetName(const std::string& name){
if(name.empty()){
return;
}
// 如果当前线程已经创建成功,就设置其name
if(t_thread){
t_thread->m_name = name;
}
// 设置全局的线程name值
t_thread_name = name;
}

线程资源回收

join

封装pthread_join,阻塞等待工作线程完成,并重置m_thread:

1
2
3
4
5
6
7
8
9
10
11
12
13
void Thread::join(){
if(m_thread){
// 对当前线程执行join
int rt = pthread_join(m_thread, nullptr);
if(rt){
SYLAR_LOG_ERROR(g_logger) << "pthread_joib fail, errnum=" << rt
<< " name=" << m_name;
throw std::logic_error("pthread_join error");
}
// 重置m_thread
m_thread = 0;
}
}

析构函数

析构函数中调用pthread_detach进行线程分离,实现析构时自动回收线程资源:

1
2
3
4
5
6
Thread::~Thread(){
// 对当前线程进行detach
if(m_thread){
pthread_detach(m_thread);
}
}

线程同步类Mutex

该部分主要使用RAII机制封装各类线程同步机制,包括信号量、读写锁、自旋锁

信号量Semaphore

RAII封装semaphore_t 实现信号量机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Semaphore: public Noncopyable{
public:
explicit Semaphore(uint32_t count = 0);

~Semaphore();

// 信号量P操作:获取/等待信号量
void wait();

// 信号量V操作:释放信号量
void notify();

private:
// 信号量变量
sem_t m_semaphore;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Semaphore::Semaphore(uint32_t count){
if(sem_init(&m_semaphore, 0, count)){
SYLAR_LOG_ERROR(g_logger) << "sem_init error";
throw std::logic_error("sem_init error");
}
}

Semaphore::~Semaphore(){
sem_destroy(&m_semaphore);
}

void Semaphore::wait(){
// sem_t不需要注意虚假唤醒问题
if(sem_wait(&m_semaphore)){
SYLAR_LOG_ERROR(g_logger) << "sem_wait error";
throw std::logic_error("sem_wait error");
}
}

void Semaphore::notify(){
if(sem_post(&m_semaphore)){
SYLAR_LOG_ERROR(g_logger) << "sem_post error";
throw std::logic_error("sem_post error");
}
}

可能的优化点:改为使用std::condition_variable实现信号量

  • sem_t的wait和notify操作会发生用户态与内核态间的切换,而std::condition_variable为纯用户态,无须切换到内核态
  • 对于旨在避免内核态的上下文切换的协程,选用std::condition_variable可能更为合适
  • sem_t主要在跨进程间操作上有着不可替代性

局部锁ScopedLockImpl

RAII封装局部锁:接收一个锁,构造时自动加锁,析构时自动解锁

功能类似std::lock_guard,但支持手动上锁/解锁

后续每一个锁都可以声明一个Lock类型成员,用于实现范围锁,如:

1
using Lock = ScopedLockImpl<Mutex>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
template<typename T>
struct ScopedLockImpl{
public:
ScopedLockImpl(T& mutex)
: m_mutex(mutex)
, m_locked(false){
lock();
}

~ScopedLockImpl(){
unlock();
}

// 上锁操作
void lock(){
if(!m_locked){
m_mutex.lock();
m_locked = true;
}
}

// 解锁操作
void unlock(){
if(m_locked){
m_mutex.unlock();
m_locked = false;
}
}

private:
T& m_mutex;
// 当前锁的状态
bool m_locked;
};

读写锁RWMutex

RAII封装pthread_rwlock_t

读写锁可实现高并发读,支持多个线程同时读取,**适合读多写少的场景**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class RWMutex: public Noncopyable{
public:
using ReadLock = ReadScopedLockImpl<RWMutex>;
using WriteLock = WriteScopedLockImpl<RWMutex>;

RWMutex(){
pthread_rwlock_init(&m_mutex, nullptr);
}

~RWMutex(){
pthread_rwlock_destroy(&m_mutex);
}

void rdlock(){
pthread_rwlock_rdlock(&m_mutex);
}

void wrlock(){
pthread_rwlock_wrlock(&m_mutex);
}

void unlock(){
pthread_rwlock_unlock(&m_mutex);
}

private:
pthread_rwlock_t m_mutex;
};

读写局部锁

接收一个锁(一般是一个RWMutex锁),构造时自动上锁,析构时自动解锁

使用atomic变量来保证对m_locked操作的线程安全性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// RAII类封装局部写锁
template<typename T>
struct ReadScopedLockImpl{
public:
ReadScopedLockImpl(T& mutex)
: m_mutex(mutex)
, m_locked(false){
lock();
}

~ReadScopedLockImpl(){
unlock();
}

// 上锁操作
void lock(){
if(!m_locked){
m_mutex.rdlock();
m_locked = true;
}
}

// 解锁操作
void unlock(){
if(m_locked){
m_mutex.unlock();
m_locked = false;
}
}

private:
T& m_mutex;
// 使用atomic变量来保证m_locked的多线程安全性
std::atomic<bool> m_locked;
};


// RAII类封装范围锁
// 接收一个锁,构造时自动加锁,析构时自动解锁
template<typename T>
struct WriteScopedLockImpl{
public:
WriteScopedLockImpl(T& mutex)
: m_mutex(mutex)
, m_locked(false){
lock();
}

~WriteScopedLockImpl(){
unlock();
}

// 上锁操作
void lock(){
if(!m_locked){
m_mutex.wrlock();
m_locked = true;
}
}

// 解锁操作
void unlock(){
if(m_locked){
m_mutex.unlock();
m_locked = false;
}
}

private:
T& m_mutex;
// 使用atomic变量来保证m_locked的多线程安全性
std::atomic<bool> m_locked;
};

互斥量Mutex

RAII封装pthread_mutex_t

互斥量与读写锁的对比:

  • 读写锁:可实现高并发读,支持多个线程同时读取,适合读多写少的场景
  • 互斥量:更简单高效,适合 读写均衡或写多读少 的场景
  • 如果初始需求不明确, 优先使用互斥量,后续可以根据性能瓶颈替换为读写锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Mutex{
public:
using Lock = ScopedLockImpl<Mutex>;

Mutex(){
pthread_mutex_init(&m_mutex, 0);
}

~Mutex(){
pthread_mutex_destroy(&m_mutex);
}

void lock(){
pthread_mutex_lock(&m_mutex);
}

void unlock(){
pthread_mutex_unlock(&m_mutex);
}

private:
pthread_mutex_t m_mutex;
};

自旋锁SpinLock

RAII封装pthread_spinlock_t

自旋锁使锁被占用时,线程进入忙等状态,所以自旋锁适合 短时间持有锁的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class SpinLock{
public:
using Lock = ScopedLockImpl<SpinLock>;

SpinLock(){
pthread_spin_init(&m_mutex, 0);
}

~SpinLock(){
pthread_spin_destroy(&m_mutex);
}

void lock(){
pthread_spin_lock(&m_mutex);
}

void unlock(){
pthread_spin_unlock(&m_mutex);
}

private:
pthread_spinlock_t m_mutex;
};