为什么要用Message Queue
解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
冗余
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。
扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
可恢复性
当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。
送达保证
消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个”只送达一次”保证。无论有多少进程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是”预定”了这个消息,暂时把它移出了队列。除非客户端明确的表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。
顺序保证
在大多使用场景下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。IronMO保证消息通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置。
缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行—写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。
理解数据流
在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。
异步通信
很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。
Messagequeue 的数据结构是什么?
基础数据结构中“先进先出”的一种数据结构
3、如何在子线程中创建 Handler?
在子线程中创建handler,要确保子线程有Looper,UI线程默认包含Looper
我们需要用到一个特殊类
HandlerThread
这个类可以轻松的创建子线程handler
创建步骤:
1: 创建一个HandlerThread,即创建一个包含Looper的线程
HandlerThread的构造函数有两个
public HandlerThread(String name) {
super(name);
mPriority = Process.THREAD_PRIORITY_DEFAULT;
}
/**
* Constructs a HandlerThread.
* @param name
* @param priority The priority to run the thread at. The value supplied must be from
* {@link android.os.Process} and not from java.lang.Thread.
*/
public HandlerThread(String name, int priority) {
super(name);
mPriority = priority;
}
这里我们使用第一个就好:
HandlerThread handlerThread=new HandlerThread("xuan");
handlerThread.start();//创建HandlerThread后一定要记得start();
通过HandlerThread的getLooper方法可以获取Looper
Looper looper=handlerThread.getLooper();
通过Looper我们就可以创建子线程的handler了
Handlr handler=new Handler(looper);
通过该handler发送消息,就会在子线程执行;
提示:如果要handlerThread停止:handlerThread.quit();
完整测试代码:
HandlerThread hanlerThread = new HandlerThread("子线程");
hanlerThread.start();
final Handler handler = new Handler(hanlerThread.getLooper()) {
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
Log.d("----->", "线程:" + Thread.currentThread().getName());
}
};
findViewById(R.id.bt).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
handler.sendEmptyMessage(100);
}
});
结果:
12-24 10:13:15.881 5024-5052/gitosctest.gitosc_studyproject D/----->: 线程:子线程
像在intentService(子线程)中,如果要回掉在UI线程怎么办呢?
new Handler(getMainLooper()).post(new Runnable() {
@Override
public void run() {
// person.getName() Realm objects can only be accessed on the thread they were created.
Toast.makeText(getApplicationContext(), "Loaded Person from broadcast-receiver->intent-service: " + info, Toast.LENGTH_LONG).show();
}
});
4、Handler post 方法原理?
一.源码分析
1.点进去看postDelayed()中的方法。里面调用sendMessageDelayed方法,和post() 里面调用的方法一样。
public final boolean postDelayed(Runnable r, long delayMillis)
{
return sendMessageDelayed(getPostMessage(r), delayMillis);
}
2.我们再点进去看下sendMessageDelayed()方法,
public final boolean sendMessageDelayed(Message msg, long delayMillis)
{
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
里面调用了sendMessageAtTime(),这里的SystemClock.uptimeMillis()是获取系统从开机启动到现在的时间,期间不包括休眠的时间,这里获得到的时间是一个相对的时间,而不是通过获取当前的时间(绝对时间)。
而之所以使用这种方式来计算时间,而不是获得当前currenttime来计算,在于handler会受到阻塞,挂起状态,睡眠等,这些时候是不应该执行的;如果使用绝对时间的话,就会抢占资源来执行当前handler的内容,显然这是不应该出现的情况,所以要避免。
3.点进sendMessageAtTime()方法看看
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
追到这里依然没有看到,他在存放的时候有什么不同,但是显然证实了消息不是延迟放进MessageQueen的,那是肿么处理的,是在轮训的时候处理的吗?,
4.我们点进Looper中看一下,主要代码,Looper中的looper调用了MessageQueen中的next方法,难道是在next()方法中处理的?
public static void loop() {
···
for (;;) {
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
···
5.我们点进MessageQueen中的next()方法
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis);
···
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
···
}}
很贴心的给出了注释解释“ Next message is not ready. Set a timeout to wake up when it is ready.”,翻译“下一条消息尚未准备好。设置一个超时,以便在准备就绪时唤醒。”
when就是uptimeMillis, for (;;) 相当于while(true),如果头部的这个Message是有延迟而且延迟时间没到的(now < msg.when),不返回message 而且会计算一下时间(保存为变量nextPollTimeoutMillis),然后再循环的时候判断如果这个Message有延迟,就调用nativePollOnce(ptr, nextPollTimeoutMillis)进行阻塞。nativePollOnce()的作用类似与object.wait()。得出结论是通过阻塞实现的。
6.但是如果在阻塞这段时间里有无延迟message又加入MessageQueen中又是怎么实现立即处理这个message的呢?,我们看MessageQueen中放入消息enqueueMessage()方法
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
在这里p 是现在消息队列中的头部消息,我们看到| when < p.when 的时候它交换了放入message与原来消息队列头部P的位置,并且 needWake = mBlocked; (在next()中当消息为延迟消息的时候mBlocked=true),继续向下看 当needWake =true的时候nativeWake(mPtr)(唤起线程)
一切都解释的通了,如果当前插入的消息不是延迟message,或比当前的延迟短,这个消息就会插入头部并且唤起线程来
二.整理
我们把我们跟踪的所有信息整理下
1.消息是通过MessageQueen中的enqueueMessage()方法加入消息队列中的,并且它在放入中就进行好排序,链表头的延迟时间小,尾部延迟时间最大
2.Looper.loop()通过MessageQueue中的next()去取消息
3.next()中如果当前链表头部消息是延迟消息,则根据延迟时间进行消息队列会阻塞,不返回给Looper message,知道时间到了,返回给message
4.如果在阻塞中有新的消息插入到链表头部则唤醒线程
5.Looper将新消息交给回调给handler中的handleMessage后,继续调用MessageQueen的next()方法,如果刚刚的延迟消息还是时间未到,则计算时间继续阻塞
三总结
handler.postDelay() 的实现 是通过MessageQueue中执行时间顺序排列,消息队列阻塞,和唤醒的方式结合实现的。
如果真的是通过延迟将消息放入到MessageQueen中,那放入多个延迟消息就要维护多个定时器,
5、Android消息机制的原理及源码解析
一、消息机制概述
1.消息机制的简介
在Android中使用消息机制,我们首先想到的就是Handler。没错,Handler是Android消息机制的上层接口。Handler的使用过程很简单,通过它可以轻松地将一个任务切换到Handler所在的线程中去执行。通常情况下,Handler的使用场景就是更新UI。
如下就是使用消息机制的一个简单实例:
public class Activity extends android.app.Activity {
private Handler mHandler = new Handler(){
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
System.out.println(msg.what);
}
};
@Override
public void onCreate(Bundle savedInstanceState, PersistableBundle persistentState) {
super.onCreate(savedInstanceState, persistentState);
setContentView(R.layout.activity_main);
new Thread(new Runnable() {
@Override
public void run() {
...............耗时操作
Message message = Message.obtain();
message.what = 1;
mHandler.sendMessage(message);
}
}).start();
}}
在子线程中,进行耗时操作,执行完操作后,发送消息,通知主线程更新UI。这便是消息机制的典型应用场景。我们通常只会接触到Handler和Message来完成消息机制,其实内部还有两大助手来共同完成消息传递。
2.消息机制的模型
消息机制主要包含:MessageQueue,Handler和Looper这三大部分,以及Message,下面我们一一介绍。
Message:需要传递的消息,可以传递数据;
MessageQueue:消息队列,但是它的内部实现并不是用的队列,实际上是通过一个单链表的数据结构来维护消息列表,因为单链表在插入和删除上比较有优势。主要功能向消息池投递消息(MessageQueue.enqueueMessage)和取走消息池的消息(MessageQueue.next);
Handler:消息辅助类,主要功能向消息池发送各种消息事件(Handler.sendMessage)和处理相应消息事件(Handler.handleMessage);
Looper:不断循环执行(Looper.loop),从MessageQueue中读取消息,按分发机制将消息分发给目标处理者。
3.消息机制的架构
消息机制的运行流程:在子线程执行完耗时操作,当Handler发送消息时,将会调用MessageQueue.enqueueMessage,向消息队列中添加消息。当通过Looper.loop开启循环后,会不断地从线程池中读取消息,即调用MessageQueue.next,然后调用目标Handler(即发送该消息的Handler)的dispatchMessage方法传递消息,然后返回到Handler所在线程,目标Handler收到消息,调用handleMessage方法,接收消息,处理消息。
[图片上传失败...(image-d41533-1609255156436)]
MessageQueue,Handler和Looper三者之间的关系:每个线程中只能存在一个Looper,Looper是保存在ThreadLocal中的。主线程(UI线程)已经创建了一个Looper,所以在主线程中不需要再创建Looper,但是在其他线程中需要创建Looper。每个线程中可以有多个Handler,即一个Looper可以处理来自多个Handler的消息。 Looper中维护一个MessageQueue,来维护消息队列,消息队列中的Message可以来自不同的Handler。
[图片上传失败...(image-95370c-1609255156436)]
下面是消息机制的整体架构图,接下来我们将慢慢解剖整个架构。
[图片上传失败...(image-d23936-1609255156436)]
从中我们可以看出: Looper有一个MessageQueue消息队列; MessageQueue有一组待处理的Message; Message中记录发送和处理消息的Handler; Handler中有Looper和MessageQueue。