使用RxJava简化编程

本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

转载自夜明的孤行灯

本文链接地址: https://www.huangyunkun.com/2015/06/27/rxjava-simple-program/

最近在做一个很有趣的项目,需求明确应用界面是console的gui。

长成这样的

在现在这个年代,这种朴素的需求实在少见。

项目大量消费自有的或者外部的API,自身逻辑还是很简单。但是其中大量的异步消费远程API,解析数据,更新UI还是让人很烦。

想起来了RxJava,代码上得到了很多简化,不敢说是最佳实践,但是换一种风格总有一些新鲜的体验。

普通方法

先来个简单的,一个和比特币有关的API,https://blockchain.info/ticker
这个API返回的是当前比特币交易价格,返回内容如下:

{
USD: {
15m: 246.7,
last: 246.7,
buy: 246.65,
sell: 246.7,
symbol: "$"
},
ISK: {
15m: 32551.57,
last: 32551.57,
buy: 32544.97,
sell: 32551.57,
symbol: "kr"
},
HKD: {
15m: 1912.37,
last: 1912.37,
buy: 1911.98,
sell: 1912.37,
symbol: "$"
},
TWD: {
15m: 7628.44,
last: 7628.44,
buy: 7626.89,
sell: 7628.44,
symbol: "NT$"
},
CHF: {
15m: 230.35,
last: 230.35,
buy: 230.31,
sell: 230.35,
symbol: "CHF"
},
EUR: {
15m: 220.23,
last: 220.23,
buy: 220.19,
sell: 220.23,
symbol: "€"
},
DKK: {
15m: 1643.43,
last: 1643.43,
buy: 1643.1,
sell: 1643.43,
symbol: "kr"
},
CLP: {
15m: 156199.81,
last: 156199.81,
buy: 156168.15,
sell: 156199.81,
symbol: "$"
},
CAD: {
15m: 305.33,
last: 305.33,
buy: 305.27,
sell: 305.33,
symbol: "$"
},
CNY: {
15m: 1526.92,
last: 1526.92,
buy: 1526.62,
sell: 1526.92,
symbol: "¥"
},
THB: {
15m: 8334.08,
last: 8334.08,
buy: 8332.39,
sell: 8334.08,
symbol: "฿"
},
AUD: {
15m: 318.78,
last: 318.78,
buy: 318.72,
sell: 318.78,
symbol: "$"
},
SGD: {
15m: 331.35,
last: 331.35,
buy: 331.28,
sell: 331.35,
symbol: "$"
},
KRW: {
15m: 273699.26,
last: 273699.26,
buy: 273643.78,
sell: 273699.26,
symbol: "₩"
},
JPY: {
15m: 30518.05,
last: 30518.05,
buy: 30511.86,
sell: 30518.05,
symbol: "¥"
},
PLN: {
15m: 918.57,
last: 918.57,
buy: 918.39,
sell: 918.57,
symbol: "zł"
},
GBP: {
15m: 157.26,
last: 157.26,
buy: 157.23,
sell: 157.26,
symbol: "£"
},
SEK: {
15m: 2033.62,
last: 2033.62,
buy: 2033.2,
sell: 2033.62,
symbol: "kr"
},
NZD: {
15m: 356.96,
last: 356.96,
buy: 356.89,
sell: 356.96,
symbol: "$"
},
BRL: {
15m: 763.82,
last: 763.82,
buy: 763.66,
sell: 763.82,
symbol: "R$"
},
RUB: {
15m: 13387.32,
last: 13387.32,
buy: 13384.61,
sell: 13387.32,
symbol: "RUB"
}
}

处理的思路也简化一下,这里我们需要最新的以美元表示的交易价格,代码如下:

new Runnable() {@Overridepublicvoidrun() {
try {
String responseAsString = Request.Get("https://blockchain.info/ticker").execute().returnContent().asString();
Map < String,
Object > responseAsJsonMap =new JacksonJsonParser().parseMap(responseAsString);
Object price = ((LinkedHashMap) responseAsJsonMap.get("USD")).get("last");
usd.setText(String.valueOf(price));
}catch(IOException e) {
e.printStackTrace();
}
}
}.run();

看着有些繁琐,但是功能是实现了。

如果需要获取其他货币表示直接在其中添加即可。

但是整个项目有很多类似的代码,都是起一个线程来请求,请求以后处理并更新UI。而且处理过程非常类似,转为JsonMap,从Map中取值。

引入RxJava

RxJava背后是Netflix,对于并发模型支持较好,能够利用服务器侧的并发,而无需触及典型的线程安全和同步问题。其API的实现控制了并发原语,能追求系统性能的提升,而不必担心破坏客户端代码。

当然,还有一个名字,反应性编程,infoq中有一个专栏 http://www.infoq.com/reactive-extensions/

具体的理念啥的就不细说了,都是能Google到的。

来看下改进后的效果。

首先需要一个Observable

public Observable < String >Get(final String uri) {
return Observable.create(new Observable.OnSubscribe < String > () {@Overridepublicvoidcall(Subscriber < ?super String > subscriber) {
try {
String responseAsString = Request.Get(uri).execute().returnContent().asString();
logger.trace(responseAsString);
subscriber.onNext(responseAsString);
subscriber.onCompleted();
}catch(IOException e) {
subscriber.onError(e);
}
}
});
}

其次需要多个map

public Func1<String, Map<String, Object>> mapJson() {
return json -> jsonParser.parseMap(json);
}
public Func1<Object, Object> expression(String l) {
Expression expression = expressionParser.parseExpression(l);
return o -> expression.getValue(o);
}

一个map是对Json的转换,一个map是对SpEL的支持,方便灵活的取值。

最后来一个subscribe来更新UI

public Action1<Object>updateComponent(AbstractComponent component) {
return o -> {
Method setText = ReflectionUtils.findMethod(component.getClass(),"setText", String.class);
if (setText !=null) {
String s = String.valueOf(o);
ReflectionUtils.invokeMethod(setText, component, s);
}
};
}

最后组合在一起

helper.Get("https://blockchain.info/ticker")
.map(helper.mapJson())
.map(helper.expression("[USD][last]"))
.subscribe(helper.updateComponent(usd));

效果如下:

比特币美元价

定时更新

这里是一个单纯的数据展示,而且只会执行一次,如果需要定时执行,RxJava也有相关的支持。

RxJava提供了Schedulers来处理这种情况。具体又包括了三种

  • computationScheduler
  • ioScheduler
  • newThreadScheduler

这里选用newThreadScheduler。

worker = Schedulers.newThread().createWorker();
worker.schedulePeriodically(new Action0() {@Overridepublicvoidcall() {
//原有的代码
}
},
0,2000, TimeUnit.MILLISECONDS);

多个订阅者

如果我们需要根据统一结果做出不同的操作,比如获取API数据以后,获取两个不同的值,然后更新到两个不同的地方,那么我们就需要两个订阅者了。

使用publish方法可以声明该结果对外公布,然后添加所有所需的订阅者后connect。

worker = Schedulers.newThread().createWorker();
worker.schedulePeriodically(new Action0() {@Overridepublicvoidcall() {
ConnectableObservable < String > publish = helper.Get("https://blockchain.info/ticker").publish();
publish.map(helper.mapJson()).map(helper.expression("[USD][last]")).subscribe(helper.updateComponent(usd));
publish.map(helper.mapJson()).map(helper.expression("[CNY][last]")).subscribe(helper.updateComponent(cny));
publish.connect();
}
},
0,2000, TimeUnit.MILLISECONDS);

最后效果

写在最后

RxJava的API无疑是简单的,背后的功能和思路更是值得学习的。

所谓的反应性编程其实本质是拉模型和推模型的思考。程序=数据+算法,那么数据源是由数据源将数据推给算法,算法被动地等待数据的到来还是数据源等待算法的访问,算法主动地将数据从数据源中拉出?

回调函数是典型的推模型的应用,设计模式中的 Command,Visitor 中都有回调函数的影子。

拉模型符合人的思考逻辑,状态由运行时的栈自动维护,理解容易;推模型性能好,但写起来不容易,而且需要自己维护相关状态。

本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

转载自夜明的孤行灯

本文链接地址: https://www.huangyunkun.com/2015/06/27/rxjava-simple-program/

发表评论