模块概述
本模块主要进行线程及各类线程同步机制的封装,提供线程类(thread.h)和线程同步类(mutex.h)。
线程类:
Thread
:构造函数传入线程入口函数和线程名称。线程类构造之后线程即开始运行,构造函数在确保线程真正开始运行之后返回。
线程同步类:
Semaphore
: 计数信号量,基于sem_t
实现
Mutex
: 互斥锁,基于pthread_mutex_t
实现
RWMutex
: 读写锁,基于pthread_rwlock_t
实现
Spinlock
: 自旋锁,基于pthread_spinlock_t
实现
几个注意点
Q:为什么不直接使用C++提供的std::thread,而是选择封装pthread_t?
A:C++的std::thread也是基于pthread实现的,但它的线程同步机制不完善,没有提供互斥量,RWMutex,Spinlock等应对高并发场景的线程同步机制。所以选择自己封装
Q:线程入口函数只支持void(void)吗?
A:实际使用时可以结合std::bind来绑定参数,这样就相当于支持任何类型和数量的参数
Q:有了协程,为什么还需要线程?
A:只有协程无法利用多Cpu,需要使用线程弥补协程的一些缺点,进行互补
线程类Thread
thread.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| class Thread: public Noncopyable{ public: using ptr = std::shared_ptr<Thread>;
Thread(std::function<void()> cb, const std::string& name);
~Thread();
pid_t getId() const { return m_id; }
std::string getName() const { return m_name; }
void join();
static Thread* GetThis();
static std::string GetName();
static void SetName(const std::string& name);
static void* run(void* arg);
private: pid_t m_id = -1; pthread_t m_thread = 0; std::function<void()> m_cb; std::string m_name; Semaphore m_semaphore; };
|
Noncopyable
线程对象理论上是不能拷贝的,所以需要禁用它的拷贝构造与拷贝赋值。通过继承一个Noncopyable类来实现这一操作
1 2 3 4 5 6 7 8 9 10
| class Noncopyable{ public: Noncopyable() = default;
~Noncopyable() = default;
Noncopyable(const Noncopyable&) = delete;
Noncopyable& operator=(const Noncopyable&) = delete; };
|
构造函数
调用pthread_create创建子线程,传入Thread::run()
作为回调函数,传入Thread实例自身(this)作为run()的参数。实际的初始化操作在run()
中完成(run()
运行在子线程中)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| Thread::Thread(std::function<void()> cb, const std::string& name) : m_cb(cb) , m_name(name){ if(name.empty()){ m_name = "UNKNOW"; } int rt = pthread_create(&m_thread, nullptr, &Thread::run, this); if(rt){ SYLAR_LOG_ERROR(g_logger) << "pthread_create fail, errnum=" << rt << " name=" << name; throw std::logic_error("pthread_create error"); } m_semaphore.wait(); }
|
信号量
线程类需要保证在构造完成之后线程函数一定已经处于运行状态,这里通过一个信号量来实现的,构造函数在创建线程后会一直阻塞,直到线程函数运行并且通知信号量,构造函数才会返回,而构造函数一旦返回,就说明线程函数已经在执行了。细节如下:
- 构造函数中对信号量上锁,
run()
中完成线程初始化后再对信号量解锁
- 这样操作保证了只有Thread初始化完成后主线程才能调用该线程实例
- 虽然wait操作和notify操作是分别在主线程和子线程中执行的,但他们操作的是同一个Thread实例的m_semaphore成员变量
run
run()
是子线程的入口函数,执行实际的线程初始化操作,初始化线程属性并运行真正的子线程任务函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| void* Thread::run(void* arg){ Thread* thread = (Thread*)arg;
t_thread = thread; t_thread_name = thread->m_name; thread->m_id = sylar::GetThreadId();
pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());
std::function<void()> cb; cb.swap(thread->m_cb);
thread->m_semaphore.notify();
cb();
return 0; }
|
几个注意点:
线程局部量
thread_local static
变量:
- static:生命周期为静态的(直到程序结束才销毁)
- thread_local:线程局部存储变量,每个线程有独立的实例
声明两个线程局部量:
t_thread
:为每个线程提供一个指向当前 Thread 对象的指针,便于在全局范围内访问与当前线程关联的 Thread 实例,而无需显式传递线程实例
t_thread_name
:本线程的名字
1 2
| static thread_local Thread* t_thread = nullptr; static thread_local std::string t_thread_name = "UNKNOW";
|
与其对应的两个接口,通过接口使得用户无须访问线程对象就能获取到当前线程的属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| Thread* Thread::GetThis(){ return t_thread; } std::string Thread::GetName(){ return t_thread_name; }
void Thread::SetName(const std::string& name){ if(name.empty()){ return; } if(t_thread){ t_thread->m_name = name; } t_thread_name = name; }
|
线程资源回收
join
封装pthread_join,阻塞等待工作线程完成,并重置m_thread:
1 2 3 4 5 6 7 8 9 10 11 12 13
| void Thread::join(){ if(m_thread){ int rt = pthread_join(m_thread, nullptr); if(rt){ SYLAR_LOG_ERROR(g_logger) << "pthread_joib fail, errnum=" << rt << " name=" << m_name; throw std::logic_error("pthread_join error"); } m_thread = 0; } }
|
析构函数
析构函数中调用pthread_detach进行线程分离,实现析构时自动回收线程资源:
1 2 3 4 5 6
| Thread::~Thread(){ if(m_thread){ pthread_detach(m_thread); } }
|
线程同步类Mutex
该部分主要使用RAII机制封装各类线程同步机制,包括信号量、读写锁、自旋锁
信号量Semaphore
RAII封装semaphore_t
实现信号量机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| class Semaphore: public Noncopyable{ public: explicit Semaphore(uint32_t count = 0);
~Semaphore();
void wait();
void notify();
private: sem_t m_semaphore; };
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| Semaphore::Semaphore(uint32_t count){ if(sem_init(&m_semaphore, 0, count)){ SYLAR_LOG_ERROR(g_logger) << "sem_init error"; throw std::logic_error("sem_init error"); } }
Semaphore::~Semaphore(){ sem_destroy(&m_semaphore); }
void Semaphore::wait(){ if(sem_wait(&m_semaphore)){ SYLAR_LOG_ERROR(g_logger) << "sem_wait error"; throw std::logic_error("sem_wait error"); } }
void Semaphore::notify(){ if(sem_post(&m_semaphore)){ SYLAR_LOG_ERROR(g_logger) << "sem_post error"; throw std::logic_error("sem_post error"); } }
|
可能的优化点:改为使用std::condition_variable实现信号量
- sem_t的wait和notify操作会发生用户态与内核态间的切换,而std::condition_variable为纯用户态,无须切换到内核态
- 对于旨在避免内核态的上下文切换的协程,选用std::condition_variable可能更为合适
- sem_t主要在跨进程间操作上有着不可替代性
局部锁ScopedLockImpl
RAII封装局部锁:接收一个锁,构造时自动加锁,析构时自动解锁
功能类似std::lock_guard
,但支持手动上锁/解锁
后续每一个锁都可以声明一个Lock类型成员,用于实现范围锁,如:
1
| using Lock = ScopedLockImpl<Mutex>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| template<typename T> struct ScopedLockImpl{ public: ScopedLockImpl(T& mutex) : m_mutex(mutex) , m_locked(false){ lock(); }
~ScopedLockImpl(){ unlock(); }
void lock(){ if(!m_locked){ m_mutex.lock(); m_locked = true; } }
void unlock(){ if(m_locked){ m_mutex.unlock(); m_locked = false; } }
private: T& m_mutex; bool m_locked; };
|
读写锁RWMutex
RAII封装pthread_rwlock_t
读写锁可实现高并发读,支持多个线程同时读取,**适合读多写少的场景**
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| class RWMutex: public Noncopyable{ public: using ReadLock = ReadScopedLockImpl<RWMutex>; using WriteLock = WriteScopedLockImpl<RWMutex>;
RWMutex(){ pthread_rwlock_init(&m_mutex, nullptr); }
~RWMutex(){ pthread_rwlock_destroy(&m_mutex); }
void rdlock(){ pthread_rwlock_rdlock(&m_mutex); }
void wrlock(){ pthread_rwlock_wrlock(&m_mutex); }
void unlock(){ pthread_rwlock_unlock(&m_mutex); }
private: pthread_rwlock_t m_mutex; };
|
读写局部锁
接收一个锁(一般是一个RWMutex锁),构造时自动上锁,析构时自动解锁
使用atomic变量来保证对m_locked
操作的线程安全性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| template<typename T> struct ReadScopedLockImpl{ public: ReadScopedLockImpl(T& mutex) : m_mutex(mutex) , m_locked(false){ lock(); }
~ReadScopedLockImpl(){ unlock(); }
void lock(){ if(!m_locked){ m_mutex.rdlock(); m_locked = true; } }
void unlock(){ if(m_locked){ m_mutex.unlock(); m_locked = false; } }
private: T& m_mutex; std::atomic<bool> m_locked; };
template<typename T> struct WriteScopedLockImpl{ public: WriteScopedLockImpl(T& mutex) : m_mutex(mutex) , m_locked(false){ lock(); }
~WriteScopedLockImpl(){ unlock(); }
void lock(){ if(!m_locked){ m_mutex.wrlock(); m_locked = true; } }
void unlock(){ if(m_locked){ m_mutex.unlock(); m_locked = false; } }
private: T& m_mutex; std::atomic<bool> m_locked; };
|
互斥量Mutex
RAII封装pthread_mutex_t
互斥量与读写锁的对比:
- 读写锁:可实现高并发读,支持多个线程同时读取,适合读多写少的场景
- 互斥量:更简单高效,适合 读写均衡或写多读少 的场景
- 如果初始需求不明确, 优先使用互斥量,后续可以根据性能瓶颈替换为读写锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| class Mutex{ public: using Lock = ScopedLockImpl<Mutex>;
Mutex(){ pthread_mutex_init(&m_mutex, 0); }
~Mutex(){ pthread_mutex_destroy(&m_mutex); }
void lock(){ pthread_mutex_lock(&m_mutex); }
void unlock(){ pthread_mutex_unlock(&m_mutex); }
private: pthread_mutex_t m_mutex; };
|
自旋锁SpinLock
RAII封装pthread_spinlock_t
自旋锁使锁被占用时,线程进入忙等状态,所以自旋锁适合 短时间持有锁的场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| class SpinLock{ public: using Lock = ScopedLockImpl<SpinLock>;
SpinLock(){ pthread_spin_init(&m_mutex, 0); }
~SpinLock(){ pthread_spin_destroy(&m_mutex); }
void lock(){ pthread_spin_lock(&m_mutex); }
void unlock(){ pthread_spin_unlock(&m_mutex); }
private: pthread_spinlock_t m_mutex; };
|