SQLite是安卓数据持久化的重要手段. 为了便捷的操作sql, 衍生了很多ORM (Object/Relation Mapping 对象关系映射). Android端比较流行的有 GreenDao, Realm, LitePal等等.
Google在 AAC 中也推出了自己的 ORM 框架 Room. 除了基础的 runtime 包, apt注解包. 还提供了对 AAC LiveData 和老牌 RxJava 两种观察者模式框架的扩展.
dependencies {
def room_version = "2.2.0-rc01"
implementation "androidx.room:room-runtime:$room_version" //Room
//编译期注解, kotlin使用kapt代替annotationProcessor
annotationProcessor "androidx.room:room-compiler:$room_version"
kapt "androidx.room:room-compiler:$rroom_version"
// ktx扩展和协程
implementation "androidx.room:room-ktx:$room_version"
// Room的RxJava扩展
implementation "androidx.room:room-rxjava2:$room_version"
// Room的的Google guava扩展
implementation "androidx.room:room-guava:$room_version"
// Test helpers
testImplementation "androidx.room:room-testing:$room_version"
}
和别的 ORM 框架一样, Room 使用了大量编译时注解帮助我们生成模板代码 (Java/annotationProcesser 和 Kotlin/kapt). 包括主要的类注解 :
- DataBase : 数据库实例
- Dao : 数据访问对象, 维护了数据库增删改查的方法
- Entity : 映射表结构的对象
使用方法和别的 ORM 差异不大, 具体参考官方文档. 这里介绍的是 Room 的数据跨进程共享的实现.
ContentProvider
提到数据库跨进程, 首先想到的肯定是ContentProvider, 他用了两套C/S模型:
- 多进程操作数据库
多个进程(客户端)连接ContentProvider(服务端), ContentProvider 提供Binder实现给客户端调用. - 跨进程数据共享
当一个进程操作ContentProvider变更数据之后,可能希望其他进程能收到通知.
客户端通过getContentResolver().registerContentObserver()
注册ContentObserver, 他提供 Binder 并把 Binder 交给系统级别的服务 ContentService, 更新就能通过服务端分发到各个客户端.
Room
Room 也支持跨进程数据共享, 只需在构造对象的时候添加 enableMultiInstanceInvalidation()
选项
@Synchronized
fun getDataBase(): AppDataBase {
return Room.databaseBuilder(
App.instance(),
AppDataBase::class.java, "databasenameXXXX"
)
.enableMultiInstanceInvalidation()
.build()
}
他的实现原理是什么呢? 看官网的一段注释 :
Note: If your app runs in a single process, you should follow the singleton design pattern when instantiating an AppDatabase object. Each RoomDatabase instance is fairly expensive, and you rarely need access to multiple instances within a single process.
如果你的APP运行在单进程中, 你应该设置 Room 为单例, 因为获取 RoomDatabase 实例的成本较高, 且很少需要在一个进程中创建多个实例.
If your app runs in multiple processes, include enableMultiInstanceInvalidation() in your database builder invocation. That way, when you have an instance of AppDatabase in each process, you can invalidate the shared database file in one process, and this invalidation automatically propagates to the instances of AppDatabase within other processes.
如果你的APP是多进程的, 在构造 database 时配置enableMultiInstanceInvalidation()
选项, 那么不同进程就会根据同一个db文件创建属于各自进程的单独实例. 并且当一个进程中的实例失效(发生变化)时, 会自动将失效传播到别的进程中.
初始化
在 db.init()
初始化时, 将多进程的处理交给 InvalidationTracker 对象.
public abstract class RoomDatabase {
public static class Builder<T extends RoomDatabase> {
private boolean mMultiInstanceInvalidation;
@NonNull
public Builder<T> enableMultiInstanceInvalidation() {
mMultiInstanceInvalidation = mName != null;
return this;
}
public T build() {
....
//DatabaseConfiguration保存配置项
DatabaseConfiguration configuration =
new DatabaseConfiguration(....mMultiInstanceInvalidation....);
T db = Room.getGeneratedImplementation(mDatabaseClass, DB_IMPL_SUFFIX);
db.init(configuration); //初始化db
return db;
}
}
@CallSuper
public void init(@NonNull DatabaseConfiguration configuration) {
mOpenHelper = createOpenHelper(configuration);
....
//如果配置了多进程选项
if (configuration.multiInstanceInvalidation) {
//使用 mInvalidationTracker 来处理多进程的情况
mInvalidationTracker.startMultiInstanceInvalidation(configuration.context,
configuration.name);
}
}
}
InvalidationTracker 在apt帮我们生成的 RoomDatabase 实现 AppDataBase_Impl
中创建. 他和 RoomDatabase 对象是一对一的关系.
public final class AppDataBase_Impl extends AppDataBase {
@Override
protected InvalidationTracker createInvalidationTracker() {
final HashMap<String, String> _shadowTablesMap = new HashMap<String, String>(0);
HashMap<String, Set<String>> _viewTables = new HashMap<String, Set<String>>(0);
return new InvalidationTracker(this, _shadowTablesMap, _viewTables, "nodemodel","user_profile");
}
}
mInvalidationTracker.startMultiInstanceInvalidation
中创建了 MultiInstanceInvalidationClient 对象.
public class InvalidationTracker {
private MultiInstanceInvalidationClient mMultiInstanceInvalidationClient;
//name是数据库的名字, executor是一个异步的线程池
void startMultiInstanceInvalidation(Context context, String name) {
mMultiInstanceInvalidationClient = new MultiInstanceInvalidationClient(context, name, this,
mDatabase.getQueryExecutor());
}
}
在 MultiInstanceInvalidationClient
的构造方法中使用了 bindService()
连接MultiInstanceInvalidationService
, 这里使用了绑定服务来进行一个 Service 和多个进程的 Client 的IPC.
/**
* Handles all the communication from {@link RoomDatabase} and {@link InvalidationTracker} to
* {@link MultiInstanceInvalidationService}.
*/
class MultiInstanceInvalidationClient {
MultiInstanceInvalidationClient(Context context, String name,
InvalidationTracker invalidationTracker, Executor executor) {
mContext = context.getApplicationContext();
mName = name;
mInvalidationTracker = invalidationTracker;
mExecutor = executor;
mObserver = new InvalidationTracker.Observer(invalidationTracker.mTableNames) {
@Override
public void onInvalidated(@NonNull Set<String> tables) {
if (mStopped.get()) {
return;
}
try {
mService.broadcastInvalidation(mClientId,
tables.toArray(new String[0]));
} catch (RemoteException e) {
Log.w(Room.LOG_TAG, "Cannot broadcast invalidation", e);
}
}
@Override
boolean isRemote() {
return true;
}
};
Intent intent = new Intent(mContext, MultiInstanceInvalidationService.class);
mContext.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE);
}
}
绑定服务介绍 :
创建提供绑定的服务时,您必须提供 IBinder
,进而提供编程接口,以便客户端使用此接口与服务进行交互. 您可以通过三种方法定义接口 :
Room 中使用的是 AIDL, 使用必须执行以下步骤:
- 创建 aidl 接口文件, 在文件内声明方法
- 编译时会根据 adil 生成 XXInterface.java 接口文件 , 内部类 XXInterface.Stub 实现了 IBinder 接口并声明了 aidl 内定义的方法
- Service 的
public IBinder onBind(Intent intent)
方法 , 返回一个 XXInterface.Stub 的实现类实例, 其定义了服务的远程过程调用 (RPC) 接口, 给客户端调用. - Client 调用
bindService()
连接 Service, 在连接成功的onServiceConnected(ComponentName name, IBinder service)
中将 Service 的 IBinder 转换为 XXInterface 对象. 通过这个对象就可以调用 Service 中定义的方法实现多个 Client 和 Service 的跨进程通信.
MultiInstanceInvalidationClient
对于同一个db文件, 每个进程可以创建自己的 RoomDatabase
对象. 在 RoomDataBase
中创建了 InvalidationTracker
, 在 InvalidationTracker
创建了 MultiInstanceInvalidationClient
.
Client 的构造方法中通过 bindService()
绑定服务,Service#Binder 向 Client 提供跨进程调用的三个方法 :
- 绑定服务成功后调用 Service#Binder 的
service.registerCallback()
方法. - 在构造中的 mObserver 里调用
mService.broadcastInvalidation()
. - 在客户端的 stop() 方法中调用
service.unregisterCallback(mCallback, mClientId)
class MultiInstanceInvalidationClient {
@Nullable
IMultiInstanceInvalidationService mService; //Service中的Binder对象
MultiInstanceInvalidationClient(Context context, String name,
InvalidationTracker invalidationTracker, Executor executor) {
mContext = context.getApplicationContext();
mName = name;
mInvalidationTracker = invalidationTracker;
mExecutor = executor;
mObserver = new InvalidationTracker.Observer(invalidationTracker.mTableNames) {
@Override
public void onInvalidated(@NonNull Set<String> tables) {
if (mStopped.get()) {
return;
}
try {
//Client 跨进程调用 Service 的传播更新方法
mService.broadcastInvalidation(mClientId,
tables.toArray(new String[0]));
} catch (RemoteException e) {
Log.w(Room.LOG_TAG, "Cannot broadcast invalidation", e);
}
}
@Override
boolean isRemote() {
return true;
}
};
//绑定服务
Intent intent = new Intent(mContext, MultiInstanceInvalidationService.class);
mContext.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE);
}
final ServiceConnection mServiceConnection = new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName name, IBinder service) {
mService = IMultiInstanceInvalidationService.Stub.asInterface(service);
//连接服务端成功
mExecutor.execute(mSetUpRunnable);
}
@Override
public void onServiceDisconnected(ComponentName name) {
mExecutor.execute(mRemoveObserverRunnable);
mService = null;
mContext = null;
}
};
final Runnable mSetUpRunnable = new Runnable() {
@Override
public void run() {
try {
final IMultiInstanceInvalidationService service = mService;
if (service != null) {
//调用Service#Binder的 registerCallback() 方法
mClientId = service.registerCallback(mCallback, mName);
//mInvalidationTracker添加Observer
mInvalidationTracker.addObserver(mObserver);
}
} catch (RemoteException e) {
Log.w(Room.LOG_TAG, "Cannot register multi-instance invalidation callback", e);
}
}
};
final Runnable mRemoveObserverRunnable = new Runnable() {
@Override
public void run() {
//mInvalidationTracker移除Observer
mInvalidationTracker.removeObserver(mObserver);
}
};
}
MultiInstanceInvalidationService
MultiInstanceInvalidationService
作为服务端可以向多个进程中的客户端MultiInstanceInvalidationClient
提供绑定服务.
IMultiInstanceInvalidationService.aidl 作为 AIDL 接口, 定义了三个接口方法给 Service 实现 :
interface IMultiInstanceInvalidationService {
//注册回调
int registerCallback(IMultiInstanceInvalidationCallback callback, String name);
//反注册回调
void unregisterCallback(IMultiInstanceInvalidationCallback callback, int clientId);
//传播失效, 即进程间同步
oneway void broadcastInvalidation(int clientId, in String[] tables);
}
服务端中aidl方法的实现都加上了
synchronized
, 因为多个客户端调用服务端的方法存在竞态
1. registerCallback() 注册
registerCallback()
涉及到两个重要的变量 mCallbackList
和 mClientNames
.
-
mCallbackList
是个列表, 用于保存服务端对多个远程客户端的callBack. 整个通知更新的过程分为两步:
Client 通过 Service#Binder 和 Service 建立连接并调用 Service#Binder 的broadcastInvalidation()
方法让 Service 来传播更新.
Service 通知其他 Client 更新的过程也涉及到 IPC, 这次由每个 Client 提供 Client#Binder 供 Serivce 调用,mCallbackList
就是用于保存每个客户端的 Binder.
public class MultiInstanceInvalidationService extends Service {
final RemoteCallbackList<IMultiInstanceInvalidationCallback> mCallbackList =
new RemoteCallbackList<IMultiInstanceInvalidationCallback>() {
@Override
public void onCallbackDied(IMultiInstanceInvalidationCallback callback,
Object cookie) {
//如果Client的Binder挂掉了, 就从mClientNames移除
mClientNames.remove((int) cookie);
}
};
// Service 提供给Client的 Binder : IMultiInstanceInvalidationService.Stub
private final IMultiInstanceInvalidationService.Stub mBinder =
new IMultiInstanceInvalidationService.Stub() {
// Assigns a client ID to the client.
@Override
public int registerCallback(IMultiInstanceInvalidationCallback callback,
String name) {
if (name == null) {
return 0;
}
synchronized (mCallbackList) {
int clientId = ++mMaxClientId;
// Use the client ID as the RemoteCallbackList cookie.
if (mCallbackList.register(callback, clientId)) {
mClientNames.append(clientId, name);
return clientId;
} else {
--mMaxClientId;
return 0;
}
}
}
}
// 客户端提供Binder: IMultiInstanceInvalidationCallback.Stub 给服务端调用
class MultiInstanceInvalidationClient {
final IMultiInstanceInvalidationCallback mCallback =
new IMultiInstanceInvalidationCallback.Stub() {
@Override
public void onInvalidation(final String[] tables) {
mExecutor.execute(new Runnable() {
@Override
public void run() {
mInvalidationTracker.notifyObserversByTableNames(tables);
}
});
}
};
}
-
mClientNames
使用 SparseArrayCompat<String> 保存所有连接他的客户端名称, 调用mClientNames.append(clientId, name)
把客户端加入集合.
key : clientId 由Service分配, 每次+1.
value : clientName 就是 RoomDatabase 数据库的名字.
2. unregisterCallback() 反注册
从 mCallbackList
, mClientNames
集合移除对应的客户端.
@Override
public void unregisterCallback(IMultiInstanceInvalidationCallback callback,
int clientId) {
synchronized (mCallbackList) {
mCallbackList.unregister(callback);
mClientNames.remove(clientId);
}
}
3. broadcastInvalidation 传播更新(将一个客户端的变化传播到别的客户端)
callback
作为 Client#Binder, 当 Client 调用过 Service#Binder 的 registerCallback()
后, 就将 callBack
添加到了 Service 的 mCallbackList
中.
当 Client 调用 Service#Binder 的 broadcastInvalidation()
传播更新时, 遍历 mCallbackList
调用 Client#Binder 的 callback.onInvalidation()
就将更新的逻辑通知到了各个 Client 的 onInvalidation()
实现.
// Broadcasts table invalidation to other instances of the same database file.
// The broadcast is not sent to the caller itself.
@Override
public void broadcastInvalidation(int clientId, String[] tables) {
//服务端和客户端是1/N的关系, 存在竞态, 方法加锁
synchronized (mCallbackList) {
String name = mClientNames.get(clientId);
if (name == null) {
Log.w(Room.LOG_TAG, "Remote invalidation client ID not registered");
return;
}
int count = mCallbackList.beginBroadcast();
try {
//遍历mCallbackList
for (int i = 0; i < count; i++) {
int targetClientId = (int) mCallbackList.getBroadcastCookie(i);
String targetName = mClientNames.get(targetClientId);
//调用这个方法的客户端不需要参与同步, name不同即不是一个数据库文件的客户端不需要同步
if (clientId == targetClientId // This is the caller itself.
|| !name.equals(targetName)) { // Not the same file.
continue;
}
try {
//这里涉及到服务端调用客户端的过程, 也是通过aidl
IMultiInstanceInvalidationCallback callback =
mCallbackList.getBroadcastItem(i);
callback.onInvalidation(tables);
} catch (RemoteException e) {
Log.w(Room.LOG_TAG, "Error invoking a remote callback", e);
}
}
} finally {
mCallbackList.finishBroadcast();
}
}
}
遍历时过滤掉不需要同步的客户端
-
clientId == targetClientId
发起传播的客户端肯定不需要通知自己 -
!name.equals(targetName)
一个APP可以有多个本地db文件, 我们需要同步的是同一个db文件在不同进程中的 RoomDatabase 对象对应的MultiInstanceInvalidationClient
(他的name就是数据库的名字).
如果 name 不同, 意味着不是同一个db文件的Client, 不需要参与同步.
传播更新
传播更新由 MultiInstanceInvalidationService
调用 MultiInstanceInvalidationClient
中的 Binder 方法来实现, 同样是通过AIDL.
IMultiInstanceInvalidationCallback.aidl 只定义了一个方法 :
interface IMultiInstanceInvalidationCallback {
oneway void onInvalidation(in String[] tables);
}
MultiInstanceInvalidationClient
中的实现 :
class MultiInstanceInvalidationClient {
final IMultiInstanceInvalidationCallback mCallback =
new IMultiInstanceInvalidationCallback.Stub() {
@Override
public void onInvalidation(final String[] tables) {
mExecutor.execute(new Runnable() {
@Override
public void run() {
//将数据库中表的更新通知给观察者
mInvalidationTracker.notifyObserversByTableNames(tables);
}
});
}
};
}
所以 MultiInstanceInvalidationService # broadcastInvalidation()
→ MultiInstanceInvalidationClient # callback.onInvalidation()
→ mInvalidationTracker.notifyObserversByTableNames(tables)
, 最终更新会交给每个Client 对应的 InvalidationTracker 处理.
接收更新的MultiInstanceInvalidationClient
InvalidationTracker 中掉用 notifyObserversByTableNames(String... tables)
通知观察者的集合 mObserverMap
.
public class InvalidationTracker {
public void notifyObserversByTableNames(String... tables) {
synchronized (mObserverMap) {
//mObserverMap 中保存了当前进程的数据库观察者
for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
//!entry.getKey().isRemote() 排除了Client构造方法内的Observer
if (!entry.getKey().isRemote()) {
//这里的tables就是当前进程修改过, 需要同步到别的进程的表
entry.getValue().notifyByTableNames(tables);
}
}
}
}
static class ObserverWrapper {
void notifyByTableNames(String[] tables) {
Set<String> invalidatedTables = null;
if (mTableNames.length == 1) {
for (String table : tables) {
if (table.equalsIgnoreCase(mTableNames[0])) {
// Optimization for a single-table observer
invalidatedTables = mSingleTableSet;
break;
}
}
} else {
ArraySet<String> set = new ArraySet<>();
for (String table : tables) { //遍历传播更新的Client修改过的表
for (String ourTable : mTableNames) { //遍历当前Client的所有表
if (ourTable.equalsIgnoreCase(table)) { //只把需要更新的表加入set集合
set.add(ourTable);
break;
}
}
}
if (set.size() > 0) {
invalidatedTables = set;
}
}
if (invalidatedTables != null) { //通知观察者更新表
mObserver.onInvalidated(invalidatedTables);
}
}
}
}
Q: 那么mObserverMap
中的 InvalidationTracker.Observer 在哪里被添加呢?**
A: InvalidationTracker.Observer 就是观察者, 他是一个抽象类, 在 LiveData 和 RxJava 两种观察者模式框架中都提供了他的实现.
我们以 RxJava 为例, 举个例子 :
@Dao
interface NodeModelDao {
@Query("SELECT * from nodemodel")
fun getAll(): Flowable<List<NodeModel>>
}
查看 kapt 编译后生成的 NodeModelDaoImpl.java 文件, 可以看到 Room 帮助我们在Callable
的 call()
方法中生成了查询的Sql操作.
public final class NodeModelDao_Impl implements NodeModelDao {
@Override
public Flowable<List<NodeModel>> getAll() {
final String _sql = "SELECT * from nodemodel";
final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 0);
return RxRoom.createFlowable(__db, new String[]{"nodemodel"}, new Callable<List<NodeModel>>() {
@Override
public List<NodeModel> call() throws Exception {
final Cursor _cursor = DBUtil.query(__db, _statement, false);
try {
.....
_item = new NodeModel(_tmpUid,_tmpId,_tmpName,_tmpTitle,_tmpTitleAlternative,_tmpUrl,_tmpTopics,_tmpHeader,_tmpFooter,_tmpIsCollected);
_result.add(_item);
return _result;
} finally {
_cursor.close();
}
}
}
}
查看 RxRoom.createFlowable
方法, 他的内部会创建一个 InvalidationTracker.Observer
实例 observer, 接着调用 database.getInvalidationTracker().addObserver(observer)
将 observer 添加到 InvalidationTracker 的 mObserverMap
中. observer 的 onInvalidated()
方法会调用 emitter.onNext(NOTHING)
再次发射, 发射后会调用 flatMapMaybe()
转换流为 maybe
返回给观察者, 而 maybe = Maybe.fromCallable(callable)
, Callable 内的 call()
实现就是上面提到的Sql查询操作.
public class RxRoom {
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public static <T> Flowable<T> createFlowable(final RoomDatabase database,
final String[] tableNames, final Callable<T> callable) {
Scheduler scheduler = Schedulers.from(database.getQueryExecutor());
final Maybe<T> maybe = Maybe.fromCallable(callable);
return createFlowable(database, tableNames)
.observeOn(scheduler)
.flatMapMaybe(new Function<Object, MaybeSource<T>>() {
@Override
public MaybeSource<T> apply(Object o) throws Exception {
return maybe;
}
});
}
public static Flowable<Object> createFlowable(final RoomDatabase database,
final String... tableNames) {
return Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(final FlowableEmitter<Object> emitter) throws Exception {
//创建InvalidationTracker.Observer实例
final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
tableNames) {
@Override
public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
if (!emitter.isCancelled()) {
//再次发射
emitter.onNext(NOTHING);
}
}
};
if (!emitter.isCancelled()) {
database.getInvalidationTracker().addObserver(observer);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() throws Exception {
database.getInvalidationTracker().removeObserver(observer);
}
}));
}
// emit once to avoid missing any data and also easy chaining
if (!emitter.isCancelled()) {
emitter.onNext(NOTHING);
}
}
}, BackpressureStrategy.LATEST);
}
}
当别的进程的 Client
修改了数据, Service
会将更新传播到所有进程相同名字的 Client
. 每个进程的 Client
对应的 InvalidationTracker 会将需要更新的表 tables 交给内部维护的集合 mObserverMap
处理.
InvalidationTracker.Observer 是个抽象类, 我们使用 LiveData/RxJava 作为 Room 查询的观察者实现时, 会实现 InvalidationTracker.Observer 对象并把它添加到 InvalidationTracker 的 mObserverMap
集合中. 遍历集合并调用 InvalidationTracker.Observer 的 onInvalidated()
实现, 就可以让上游再次发送消息, 再次触发下游观察者的订阅, 到这里更新的流程就结束了.
发起更新的MultiInstanceInvalidationClient
我们已经完成了 Service 通过 Client#Binder 向各个 Client 传播更新的IPC流程, 以及 每个 Client 的 InvalidationTracker 又通过 mObserverMap
通知自己的观察者的流程. 接下来我们只要知道 Service#broadcastInvalidation()
在什么情况下会被调用.
查看Client的构造方法, 这里初始化的 InvalidationTracker.Observer 的 onInvalidated()
中调用了 mService.broadcastInvalidation()
. 所以只需要知道这个 observer 的方法什么时候被调用.
MultiInstanceInvalidationClient(Context context, String name,
InvalidationTracker invalidationTracker, Executor executor) {
mContext = context.getApplicationContext();
mName = name;
mInvalidationTracker = invalidationTracker;
mExecutor = executor;
mObserver = new InvalidationTracker.Observer(invalidationTracker.mTableNames) {
@Override
public void onInvalidated(@NonNull Set<String> tables) {
if (mStopped.get()) {
return;
}
try {
mService.broadcastInvalidation(mClientId,
tables.toArray(new String[0]));
} catch (RemoteException e) {
Log.w(Room.LOG_TAG, "Cannot broadcast invalidation", e);
}
}
@Override
boolean isRemote() {
return true;
}
};
Intent intent = new Intent(mContext, MultiInstanceInvalidationService.class);
mContext.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE);
}
查看apt编译后生成的XXDao_Impl, 可以看到包括插入,删除,修改等涉及到写的操作都是通过事务 transcation 来完成的. 这很好理解 写入操作要保证原子性.
public final class NodeModelDao_Impl implements NodeModelDao {
@Override
public void insertAll(List<NodeModel> userEntities) {
__db.beginTransaction();
try {
__insertionAdapterOfNodeModel.insert(userEntities);
__db.setTransactionSuccessful();
} finally {
__db.endTransaction();
}
}
@Override
public void deleteAll() {
final SupportSQLiteStatement _stmt = __preparedStmtOfDeleteAll.acquire();
__db.beginTransaction();
try {
_stmt.executeUpdateDelete();
__db.setTransactionSuccessful();
} finally {
__db.endTransaction();
__preparedStmtOfDeleteAll.release(_stmt);
}
}
}
RoomDatabase # endTransaction() 在事务结束的方法中, 调用了 mInvalidationTracker.refreshVersionsAsync()
public void endTransaction() {
mOpenHelper.getWritableDatabase().endTransaction();
if (!inTransaction()) {
// enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
// endTransaction call to do it.
mInvalidationTracker.refreshVersionsAsync();
}
}
InvalidationTracker # refreshVersionsAsync() , 遍历 mObserverMap
, map里包含两种Observer
- Client 构造方法内用于IPC调用
mService.broadcastInvalidation()
的Observer - Client 内做查询操作的观察者的Observer
public void refreshVersionsAsync() {
// TODO we should consider doing this sync instead of async.
if (mPendingRefresh.compareAndSet(false, true)) {
mDatabase.getQueryExecutor().execute(mRefreshRunnable);
}
}
@VisibleForTesting
Runnable mRefreshRunnable = new Runnable() {
@Override
public void run() {
final Lock closeLock = mDatabase.getCloseLock();
boolean hasUpdatedTable = false;
if (hasUpdatedTable) {
synchronized (mObserverMap) {
for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
entry.getValue().notifyByTableVersions(mTableInvalidStatus);
}
}
}
}
};
InvalidationTracker.ObserverWrapper # notifyByTableVersions()
void notifyByTableVersions(BitSet tableInvalidStatus) {
Set<String> invalidatedTables = null;
final int size = mTableIds.length;
for (int index = 0; index < size; index++) {
final int tableId = mTableIds[index];
if (tableInvalidStatus.get(tableId)) {
if (size == 1) {
// Optimization for a single-table observer
invalidatedTables = mSingleTableSet;
} else {
if (invalidatedTables == null) {
invalidatedTables = new ArraySet<>(size);
}
invalidatedTables.add(mTableNames[index]);
}
}
}
if (invalidatedTables != null) {
mObserver.onInvalidated(invalidatedTables);
}
}
验证
通过 android:process 我们可以最快的模拟跨进程的情况
<activity android:name=".ui.DetailActivity1"/>
<activity android:name=".ui.DetailActivity2"
android:process=":Process2" />
如图, 当在Process2中对database进行 insert/delete 写入操作时, Process1中的观察者也能响应跨进程的更新, 从而更新UI.
总结
Room 跨进程共享数据使用了两套 C/S 模型, 涉及到两个aidl类. Service 提供 ServiceBinder
, 每个 Client 也会提供 ClientBinder
. 大概流程 :
所有 Client 初始化会调用
ServiceBinder
的registerCallback()
方法将ClientBinder
传递给 Service, Service 内维护了mCallbackList
来保存所有的ClientBinder
.
当某个 Client 更新时, 会调用ServiceBinder
的broadcastInvalidation()
传播更新.Service 的
broadcastInvalidation()
会遍历ClientBinder
的集合mCallbackList
, 将更新交给每个 Client 对应的 InvalidationTracker 处理.InvalidationTracker 维护了对database数据的观察者集合
mObserverMap
, 遍历map调用observer#onInvalidated()
会触发再次查询的操作, 观察者就可以接收到新的数据.