为何要使用 Java 线程同步?
当多个线程同时操作一个可共享的资源变量时,将会导致数据不准确,相互之间产生冲突,因此加入同步锁以避免在该线程没有完成操作之前,被其他线程的调用,从而保证了该变量的唯一性和准确性。
Java 中提供了很多线程同步操作,比如:synchronized
关键字、wait/notifyAll
、ReentrantLock
、Condition
、一些并发包下的工具类、Semaphore
,ThreadLocal
、AbstractQueuedSynchronizer
等。本文主要说明一下这几种同步方式的使用及优劣。
ReentrantLock
可重入锁
对于同一个线程,可以继续调用加锁的方法,而不会被挂起。可重入锁内部维护一个计数器,对于同一个线程调用 lock 方法,计数器 +1,调用 unlock 方法,计数器-1。怎么理解呢?看看下面的例子:
private ReentrantLock lock = new ReentrantLock();
public void execute() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " lock!");
try {
anotherLock();
Thread.sleep(5000l);
} catch (InterruptedException e) {
System.err.println(Thread.currentThread().getName() + " interrupted");
Thread.currentThread().interrupt();
}
} finally {
lock.unlock();
}
}
public void anotherLock() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " lock again!");
} finally {
lock.unlock();
}
}
输出:
Thread-0 lock!
Thread-0 lock again!
在一个加锁方法 execute()
中调用另外一个加锁方法 anotherLock() 并不会被挂起(不用等待锁,就不需要被挂起),可以直接调用(调用 execute 方法时计数器+1,然后内部又调用了 anotherLock
方法,计数器 +1,变成了2)。
synchronized
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
Thread {
execute()
}.apply {
name = "thread-A"
}.start()
Thread {
execute()
}.apply {
name = "thread-B"
}.start()
}
@Synchronized
fun execute() {
Log.i("WWE", "${Thread.currentThread().name} -> synchronized called")
try {
anotherSynchronized()
Thread.sleep(1500)
} catch (ex: InterruptedException) {
Thread.currentThread().interrupt();
}
}
@Synchronized
fun anotherSynchronized() {
Log.i("WWE", "${Thread.currentThread().name} -> anotherSynchronized called")
}
}
使用 synchronized
代码块同步关键代码即可,没有必要同步整个方法,同步是一种高开销的操作,因此应该尽量减少同步的内容。
关于 Lock
对象和 synchronized
两种锁选择的考量:
1,最好两个都不用,使用 java.util.concurrent
包提供的机制,能够帮助用户处理所有与锁相关的代码。
2,如果 synchronized
关键字能满足用户的需求,就用 synchronized
,因为它能简化代码。
3,如果需要更高级的功能,就用 ReentrantLock
类,此时要注意及时释放锁,否则会出现死锁,通常在 finally
代码释放锁。
ReentrantLock
有提供tryLock
方法,可以设置超时时间,如果超过了这个时间还没有获取到锁,就会放弃。ReentrantLock
可以使用多个Condition
,可以中断一个试图获得锁的线程,ReentrantLock
可以选择公平锁和非公平锁,ReentrantLock
可以获得正在等待线程的个数,计数器等;
Condition
条件对象
对于一个已经拿到了 Lock 锁的线程,如果该线程需要等待某个条件才会执行,这种情况就考虑使用 Condition
条件对象。
Condition
可以替代传统的线程间通信,用 await()
替换 wait()
,用 signal()
替换 notify()
,用 signalAll()
替换 notifyAll()
。
为什么方法名不直接叫
wait()/notify()/nofityAll()
?因为 Object 的这几个方法是 final 的,不可重写!
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val lock = ReentrantLock()
val condition = lock.newCondition()
Thread {
lock.lock()
try {
// do sth
try {
condition.await()
Log.i("WWE", "${Thread.currentThread().name} -> i waked up, more strong!")
} catch (ex: InterruptedException) {
Thread.currentThread().interrupt()
}
} finally {
lock.unlock()
}
}.apply {
name = "thread-A"
}.start()
Thread {
lock.lock()
try {
// do sth
try {
Thread.sleep(3000)
Log.i("WWE", "${Thread.currentThread().name} -> wake up from dream")
} catch (ex: InterruptedException) {
Thread.currentThread().interrupt()
}
condition.signalAll()
} finally {
lock.unlock()
}
}.apply {
name = "thread-B"
}.start()
}
}
输出:
2022-04-22 21:32:20.790 13761-13786/com.dev I/WWE: thread-B -> wake up from dream
2022-04-22 21:32:20.790 13761-13785/com.dev I/WWE: thread-A -> i waked up, more strong!
这个例子中 thread-A 执行到 condition.await()
时,thread-A 会被挂起,直到thread-B 调用了 condition.signalAll()
方法之后,thread-A 才会重新被激活执行。
这里需要注意的是 thread-A 调用 Condition 的
await()
方法之后,thread-A 线程释放锁,然后马上加入到 Condition 的等待队列中,由于 thread-A 释放了锁,thread-B 获得锁并执行,thread-B 执行signalAll()
方法之后,Condition中的等待队列 thread-A 被取出并加入到 AQS 中,接下来 thread-B 执行完毕之后释放锁,由于 thread-A 已经在 AQS 的等待队列中,所以 thread-A 被唤醒,继续执行。
Condition
是被绑定到 Lock
上的,要创建一个 Lock
的 Condition
必须用 newCondition()
方法。传统线程的通信方式,Condition
都可以实现。Condition
的强大之处在于它可以为多个线程间建立不同的 Condition
。
wait¬ify/notifyAll 方式
class MainActivity : AppCompatActivity() {
private val obj = Object()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
Thread {
doWait()
}.apply {
name = "thread-A"
}.start()
Thread {
doNotify()
}.apply {
name = "thread-B"
}.start()
}
private fun doWait() {
synchronized(obj) {
try {
Log.i("WWE", "${Thread.currentThread().name} #doWait")
obj.wait()
Log.i("WWE", "${Thread.currentThread().name} wake up")
} catch (ex: InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
private fun doNotify() {
synchronized(obj) {
try {
Log.i("WWE", "${Thread.currentThread().name} #doNotify")
Thread.sleep(3000)
obj.notifyAll()
Log.i("WWE", "${Thread.currentThread().name} notifyAll")
} catch (ex: InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
}
输出:
2022-04-23 01:02:31.160 15948-15987/com.dev I/WWE: thread-A doWait()
2022-04-23 01:02:31.161 15948-15988/com.dev I/WWE: thread-B doNotify()
2022-04-23 01:02:34.163 15948-15988/com.dev I/WWE: thread-B notifyAll
2022-04-23 01:02:34.163 15948-15987/com.dev I/WWE: thread-A wake up
这里需要注意的是 调用 wait/notifyAll
方法的时候一定要获得当前线程的锁,否则会发生 IllegalMonitorStateException
异常。
thread.join() 方法
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val thread = Thread {
Log.i("WWE", "${Thread.currentThread().name} run")
}.apply {
name = "thread-A"
}
thread.start()
try {
thread.join()
} catch (ex: InterruptedException) {
ex.printStackTrace()
}
Log.i("WWE", "${Thread.currentThread().name} run")
}
}
输出:
2022-04-23 01:10:06.870 16135-16159/com.dev I/WWE: thread-A run
2022-04-23 01:10:06.870 16135-16135/com.dev I/WWE: main run
Thread.yield() 方法
Yield 方法可以暂停当前正在执行的线程对象,让其它有相同优先级的线程执行。它是一个静态方法而且 只保证当前线程放弃 CPU 占用,而不能保证使其它线程一定能占用 CPU,执行
yield()
的线程有可能在进入到暂停状态后马上又被执行。
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
Thread {
doSth()
}.apply {
name = "Thread-A"
}.start()
Thread {
doSth()
}.apply {
name = "Thread-B"
}.start()
Thread.sleep(3000)
Log.i("WWE", "${Thread.currentThread().name} run")
}
@Synchronized
private fun doSth() {
for(i in 0..3) {
Log.i("WWE", "${Thread.currentThread().name} run")
if("Thread-A" == Thread.currentThread().name && i == 1) {
Thread.yield()
}
}
}
}
Thread.sleep() 方法
在指定的时间内无法被唤醒,同时也不会释放对象锁(如果当前已经持有锁),该方法告诉操作系统在指定时间内不需为该线程分配执行时间片,实际上,调用 sleep()
方法时并不要求持有任何锁,也就不需要包裹在 synchronized
中。
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
for (i in 0 until 2) {
Thread {
try {
Log.i("WWE", "${Thread.currentThread().name} before")
Thread.sleep(5000)
Log.i("WWE", "${Thread.currentThread().name} after")
} catch (ex: InterruptedException) {
ex.printStackTrace()
}
}.apply {
name = "Thread-$i"
}.start()
}
Log.i("WWE", "${Thread.currentThread().name} run")
}
}
输出:
2022-04-23 02:48:45.438 19188-19188/com.dev I/WWE: main run
2022-04-23 02:48:45.438 19188-19211/com.dev I/WWE: Thread-0 before
2022-04-23 02:48:45.439 19188-19212/com.dev I/WWE: Thread-1 before
2022-04-23 02:48:50.441 19188-19212/com.dev I/WWE: Thread-1 after
2022-04-23 02:48:50.441 19188-19211/com.dev I/WWE: Thread-0 after
ThreadLocal
ThreadLocal
是一种把变量放到线程本地的方式来实现线程同步的。ThreadLocal
与同步机制都是为了解决多线程中相同变量的访问冲突问题。
class MainActivity : AppCompatActivity() {
// SimpleDateFormat 不是一个线程安全的类,可以使用 ThreadLocal 实现同步
private val dateFormatThreadLocal = object : ThreadLocal<SimpleDateFormat>() {
override fun initialValue(): SimpleDateFormat? {
return SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
}
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
Thread {
Log.i(
"WWE", "${Thread.currentThread().name} -> ${
dateFormatThreadLocal.get().format(
Date()
)
}"
)
}.apply {
name = "thread-A"
}.start()
Thread {
Log.i(
"WWE", "${Thread.currentThread().name} -> ${
dateFormatThreadLocal.get().format(
Date()
)
}"
)
}.apply {
name = "thread-B"
}.start()
}
}
输出:
2022-04-23 01:48:46.363 17102-17127/com.dev I/WWE: thread-A -> 2022-04-23 01:48:46
2022-04-23 01:48:46.364 17102-17128/com.dev I/WWE: thread-B -> 2022-04-23 01:48:46
Semaphore 信号量
Semaphore 用于控制在同一个时间允许访问线程的个数,保证线程可以被合理的使用,可以使用构造器初始化同一时间允许被访问线程的个数:
class MainActivity : AppCompatActivity() {
private val semaphore = Semaphore(2)
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
for (i in 0 until 5) {
Thread {
try {
semaphore.acquire()
Log.i("WWE", "${Thread.currentThread().name} run")
Thread.sleep(5000)
semaphore.release()
} catch (ex: InterruptedException) {
ex.printStackTrace()
}
}.apply {
name = "Thread-$i"
}.start()
}
}
}
输出:
2022-04-23 02:33:55.669 18447-18474/com.dev I/WWE: Thread-1 run
2022-04-23 02:33:55.669 18447-18476/com.dev I/WWE: Thread-3 run
2022-04-23 02:34:00.670 18447-18475/com.dev I/WWE: Thread-2 run
2022-04-23 02:34:00.670 18447-18473/com.dev I/WWE: Thread-0 run
2022-04-23 02:34:05.671 18447-18477/com.dev I/WWE: Thread-4 run
可以看出同一时间内,只有2个线程可以被同时访问,因为构造函数里传的是2。
CountDownLatch
CountDownLatch
是一个计数器,它的构造方法中需要设置一个数值,用来设定计数的次数。每次调用 countDown()
方法之后,这个计数器都会减去1,CountDownLatch
会一直阻塞着调用 await()
方法的线程,直到计数器的值变为0。
class MainActivity : AppCompatActivity() {
private val countDownLatch = CountDownLatch(5)
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
for(i in 0 until 5) {
Thread {
Log.i("WWE", "${Thread.currentThread().name} sleep before")
try {
Thread.sleep(5000)
} catch (ex: InterruptedException) {
ex.printStackTrace()
}
Log.i("WWE", "${Thread.currentThread().name} sleep after")
countDownLatch.countDown()
}.apply {
name = "Thread-$i"
}.start()
}
try {
countDownLatch.await()
} catch (ex: InterruptedException) {
ex.printStackTrace()
}
Log.i("WWE", "${Thread.currentThread().name} run")
}
}
输出:
2022-04-23 05:30:20.426 20564-20588/com.dev I/WWE: Thread-0 sleep before
2022-04-23 05:30:20.426 20564-20592/com.dev I/WWE: Thread-4 sleep before
2022-04-23 05:30:20.426 20564-20589/com.dev I/WWE: Thread-1 sleep before
2022-04-23 05:30:20.426 20564-20591/com.dev I/WWE: Thread-3 sleep before
2022-04-23 05:30:20.426 20564-20590/com.dev I/WWE: Thread-2 sleep before
2022-04-23 05:30:25.429 20564-20589/com.dev I/WWE: Thread-1 sleep after
2022-04-23 05:30:25.429 20564-20590/com.dev I/WWE: Thread-2 sleep after
2022-04-23 05:30:25.429 20564-20588/com.dev I/WWE: Thread-0 sleep after
2022-04-23 05:30:25.430 20564-20592/com.dev I/WWE: Thread-4 sleep after
2022-04-23 05:30:25.430 20564-20591/com.dev I/WWE: Thread-3 sleep after
2022-04-23 05:30:25.430 20564-20564/com.dev I/WWE: main run
当线程调用 CountDownLatch 的 await 方法时,便会尝试获取 共享锁
,不过一开始通常获取不到锁,于是线程被阻塞。共享锁
可获取到的条件是 锁计数器
的值为 0,而 锁计数器
的初始值为 count,当每次调用 CountDownLatch 对象的 countDown 方法时,也可以把 锁计数器
- 1。通过这种方式,调用 count 次 countDown 方法之后,锁计数器
就为 0 了,于是之前等待的线程就会继续运行了,并且此时如果再有线程想调用 await
方法时也会被立刻放行,不会再去做任何阻塞操作了。
使用原子变量实现线程同步
什么是原子操作呢?
原子操作就是指将
读取变量
、修改变量
、保存变量
看成一个整体来操作,即这几种行为要么同时完成,要么都不完成。
在 java.util.concurrent.atomic
包中提供了创建原子类型变量的工具类,使用该类可以简化线程同步。比如:其中 AtomicInteger
以原子方式更新 int
的值:
class Bank {
private AtomicInteger account = new AtomicInteger(100);
public AtomicInteger getAccount() {
return account;
}
public void save(int money) {
account.addAndGet(money);
}
}
AbstractQueuedSynchronizer
AQS 是很多同步工具类的基础,比如:ReentrantLock
里的公平锁和非公平锁,Semaphore
里的公平锁和非公平锁,CountDownLatch
里的锁等他们的底层都是使用 AbstractQueuedSynchronizer
完成的。在实际开发当中,应当尽量远离底层结构。下面基于 AbstractQueuedSynchronizer
自定义实现一个独占锁。
class MySynchronizer : AbstractQueuedSynchronizer() {
override fun tryAcquire(arg: Int): Boolean {
if (compareAndSetState(0, 1)) {
exclusiveOwnerThread = Thread.currentThread()
return true
}
return false
}
override fun tryRelease(arg: Int): Boolean {
state = 0
exclusiveOwnerThread = null
return true
}
fun lock() {
acquire(1)
}
fun unlock() {
release(1)
}
}
class MainActivity : AppCompatActivity() {
private val mySynchronizer = MySynchronizer()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
Thread {
mySynchronizer.lock()
try {
Log.i("WWE", "${Thread.currentThread().name} run before")
try {
Thread.sleep(5000)
} catch (ex: InterruptedException) {
Thread.currentThread().interrupt()
}
Log.i("WWE", "${Thread.currentThread().name} run after")
} finally {
mySynchronizer.unlock()
}
}.apply {
name = "Thread-A"
}.start()
Thread {
mySynchronizer.lock()
try {
Log.i("WWE", "${Thread.currentThread().name} run")
} finally {
mySynchronizer.unlock()
}
}.apply {
name = "Thread-B"
}.start()
Log.i("WWE", "${Thread.currentThread().name} run")
}
}
输出:
2022-04-23 07:26:52.498 22603-22603/com.dev I/WWE: main run
2022-04-23 07:26:52.498 22603-22628/com.dev I/WWE: Thread-B run
2022-04-23 07:26:52.498 22603-22627/com.dev I/WWE: Thread-A run before
2022-04-23 07:26:57.499 22603-22627/com.dev I/WWE: Thread-A run after
使用阻塞队列实现线程同步
LinkedBlockingQueue
是一个基于链表的队列,先进先出的顺序(FIFO),范围任意的 blocking queue。