一:线程理解
线程,有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元。一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。
二:线程的应用
以Android程序为例,它得所有方法默认实在主线程执行如下:
public class CourseTwoActivity extends Activity {
private final static String TAG = "CTWO";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_course_2);
Log.d(TAG,Thread.currentThread().getName());
}
}
运行结果如下:
03-03 16:24:40.188 3637-3637/com.pse.rxandroid D/CTWO: main
对于RxJava我们的线程又是什么样的呢?
public class CourseTwoActivity extends Activity {
private final static String TAG = "CTWO";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_course_2);
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG,Thread.currentThread().getName());
e.onNext("大保健");
e.onNext("桑拿");
e.onNext("K歌");
e.onNext("开房");
e.onComplete();
}
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.d(TAG,s+"处在线程:"+Thread.currentThread().getName());
}
};
observable.subscribe(consumer);
}
}
上面代码中我们创建了Observerable (被观察者/数据发射者)还有Consumer(观察者/数据消费者)之间又通过subScribe进行订阅。
同时上面我们打印了每个对象所处在的线程,运行结果如下:
03-03 16:37:37.348 15430-15430/com.pse.rxandroid D/CTWO: main
03-03 16:37:37.348 15430-15430/com.pse.rxandroid D/CTWO: 大保健处在线程:main
03-03 16:37:37.348 15430-15430/com.pse.rxandroid D/CTWO: 桑拿处在线程:main
03-03 16:37:37.348 15430-15430/com.pse.rxandroid D/CTWO: K歌处在线程:main
03-03 16:37:37.348 15430-15430/com.pse.rxandroid D/CTWO: 开房处在线程:main
大家可以看出Android中默认的都是在主线程执行。但是我们平常遇到很多耗时操作,这是不能放在主线程执行的,不然就会造成RuntimeException,也就是运行时异常。
那么我们来修改下RxJava的代码:
public class CourseTwoActivity extends Activity {
private final static String TAG = "CTWO";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_course_2);
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG,Thread.currentThread().getName());
e.onNext("大保健");
e.onNext("桑拿");
e.onNext("K歌");
e.onNext("开房");
e.onComplete();
}
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.d(TAG,s+"处在线程:"+Thread.currentThread().getName());
}
};
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}
}
修改后的运行结果如下:
03-03 16:47:28.418 26446-26468/? D/CTWO: RxNewThreadScheduler-1
03-03 16:47:28.443 26446-26446/? D/CTWO: 大保健处在线程:main
03-03 16:47:28.443 26446-26446/? D/CTWO: 桑拿处在线程:main
03-03 16:47:28.443 26446-26446/? D/CTWO: K歌处在线程:main
03-03 16:47:28.443 26446-26446/? D/CTWO: 开房处在线程:main
在我们修改的代码中为Observerable设置了新的线程Schedulers.newThrea(), 同时我们又制定Observer运行的线程是 AndroidSchedulers.mainThread().
可以看出RxJava可以灵活的指定 “消息发送者” “消息处理者” 所运行的线程。
subscribeOn()可以多次指定但是仅仅第一次生效
observeOn()亦可以多次指定每指定一次,都会进行线程切换
三:运用
我们先写一个文件读写的工具类代码如下:
public class FileUtils {
/**
* 读取文本内容
* @param context
* @param fileName
* @return
*/
public static String getStringFromAssets(Context context,String fileName){
try {
InputStream in = context.getResources().getAssets().open(fileName);
byte[] bytes = new byte[in.available()];
in.read(bytes);
in.close();
String str = new String(bytes,"utf-8");
return str;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
我们将文件放在Assets目录下面,来读取。
那么我们在Activity中的具体操作如下。
public class CourseTwoActivity extends Activity {
private final static String TAG = "CTWO";
TextView tv_text;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_course_2);
tv_text = (TextView)findViewById(R.id.tv_text);
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG,Thread.currentThread().getName());
e.onNext(FileUtils.getStringFromAssets(CourseTwoActivity.this,"Gank.txt"));
e.onComplete();
}
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.d(TAG,Thread.currentThread().getName());
tv_text.setText(s);
}
};
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}
}
关于RxJava的线程介绍就到这里,下来我们就会遇到RxJava的操作符。