应用开发

Java9异步编程-反应式流使用

时间:2010-12-5 17:23:32  作者:系统运维   来源:IT科技类资讯  查看:  评论:0
内容摘要:在本文中,主要研究下Java9+中的反应流。简单地说,就是使用Flow类,Flow类包含了用于构建响应式流处理的主要模块。这里说的反应流其实是一种非阻塞式背压的异步流处理标准(有点绕口)。该规范在响应

 在本文中,异步主要研究下Java9+中的编程反应流。简单地说,式流使用就是异步使用Flow类,Flow类包含了用于构建响应式流处理的编程主要模块。

这里说的式流使用反应流其实是一种非阻塞式背压的异步流处理标准(有点绕口)。

该规范在响应式宣言中定义,异步并且有各种各样的编程实现,例如RxJava或Akka-Streams。式流使用

Reactive API总览

要构建一个流,异步主要使用三个抽象,编程并将它们组合成异步处理逻辑。式流使用

每个流都需要处理由Publisher实例发布给它的异步事件;发布者有一个subscribe()的方法。

如果某个订阅者希望接收发布者发布的编程事件,则需要使用subscribe()订阅发布者。式流使用

消息的接收方需要实现订阅者接口。一般情况下,接受者是每个Flow处理的结束,因为它的实例不会进一步发送消息。

可以将Subscriber看作Sink。香港云服务器有四个方法需要重写onSubscribe(), onNext(), onError()和onComplete()。

如果希望转换传入的消息并将其进一步传递给下一个订阅服务,则需要实现Processor接口。

它既充当订阅服务(因为它接收消息),又充当发布服务(因为它处理这些消息并将它们发送以进行进一步处理)。

发布和消费消息

假设想要创建一个简单的流,其中有一个发布者发布消息,一个简单的订阅者在消息到达时使用消息。

先创建一个EndSubscriber类。需要实现订阅服务接口。接下来,重写所需的方法。

onSubscribe()方法在处理开始之前被调用。

订阅的实例subscription作为参数传递。Subscription是控制订阅服务和发布服务之间的消息流的类.

 1public class EndSubscriber<T> implements Subscriber<T> {   2       // 多少消息需要消费  3    private final AtomicInteger howMuchMessagesToConsume;  4    private Flow.Subscription subscription;  5    // 保存消费过的消息  6    public List<T> consumedElements = new LinkedList<>();  7  8    public EndSubscriber(Integer howMuchMessagesToConsume) {   9        this.howMuchMessagesToConsume = new AtomicInteger(howMuchMessagesToConsume); 10    } 11 12    @Override 13    public void onSubscribe(Flow.Subscription subscription) {  14        this.subscription = subscription; 15        subscription.request(1); 16    } 17} 

在这里还初始化了一个在测试中使用的消耗元素的空列表。

现在,需要从订阅者接口实现其余的方法。这里的主要方法是站群服务器onNext(),它在发布者发布新消息时被调用

1@Override 2public void onNext(T item) {  3    System.out.println("Got : " + item); 4    consumedElements.add(item); 5    subscription.request(1); 6} 

这里需要注意的的是,当在onSubscribe()方法中启动开始订阅时,以及当处理消息时onNext(),需要调用subscription上的request()方法来通知当前订阅器准备使用更多消息。

最后,需要实现onError(),它会在处理过程中抛出异常时被调用.

在发布者关闭时调用onComplete().

1@Override 2public void onError(Throwable t) {  3    t.printStackTrace(); 4} 5 6@Override 7public void onComplete() {  8    System.out.println("Done"); 9} 

接下来为这个处理流编写一个测试。将使用SubmissionPublisher类,这是java.util.concurrent中的一个类,它实现了Publisher接口。

测试中向发布者提交N个元素,我们的终端订阅者会接收到这些元素。

 1@Test  2public void whenSubscribeToIt_thenShouldConsumeAll()   3  throws InterruptedException {   4  5    // given  6    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();  7    EndSubscriber<String> subscriber = new EndSubscriber<>();  8    publisher.subscribe(subscriber);  9    List<String> items = List.of("1", "x", "2", "x", "3", "x"); 10 11    // when 12    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); 13    items.forEach(publisher::submit); 14    publisher.close(); 15 16    // then 17     await().atMost(1000, TimeUnit.MILLISECONDS) 18       .until( 19         () -> assertThat(subscriber.consumedElements) 20         .containsExactlyElementsOf(items) 21     ); 22} 

注意,在publisher实例上调用close()方法。它将在每个订阅者上调用onComplete()。

程序输出如下:

1Got : 1 2Got : x 3Got : 2 4Got : x 5Got : 3 6Got : x 7Done 

消息的转换

假设还希望在发布者和订阅者之间做一些数据的转换。

下面我创建一个TransformProcessor类,它实现了Processor并扩展了SubmissionPublisher,因为它同时包含Publisher和Subscriber。

并且将传入一个Function将输入转换到输出。

 1import java.util.concurrent.Flow;  2import java.util.concurrent.SubmissionPublisher;  3import java.util.function.Function;  4  5public class TransformProcessor<T,R> extends SubmissionPublisher<R> implements Flow.Processor<T,R> {   6    private Function<T,R> function;  7    private Flow.Subscription subscription;  8  9    public TransformProcessor(Function<T, R> function) {  10        super(); 11        this.function = function; 12    } 13 14    @Override 15    public void onSubscribe(Flow.Subscription subscription) {  16        this.subscription = subscription; 17        subscription.request(1); 18    } 19 20    @Override 21    public void onNext(T item) {  22        submit(function.apply(item)); 23        subscription.request(1); 24    } 25 26    @Override 27    public void onError(Throwable t) {  28        t.printStackTrace(); 29    } 30 31    @Override 32    public void onComplete() {  33        close(); 34    } 35} 

这里的TransformProcessor将把String转换为两个String,看下面我写的测试用例。网站模板

 1 @Test  2    public void whenSubscribeAndTransformElements_thenShouldConsumeAll() {   3        // given  4        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();  5        Function<String, String> dup = x -> x.concat(x);  6        TransformProcessor<String, String> transformProcessor  7                = new TransformProcessor<>(dup);  8        EndSubscriber<String> subscriber = new EndSubscriber<>(6);  9        List<String> items = List.of("1", "2", "3"); 10        List<String> expectedResult = List.of("11", "22", "33"); 11        // when 12        publisher.subscribe(transformProcessor); 13        transformProcessor.subscribe(subscriber); 14        items.forEach(publisher::submit); 15        publisher.close(); 16 17        await().atMost(1000, TimeUnit.MILLISECONDS) 18                .untilAsserted(() -> assertTrue(subscriber.consumedElements.containsAll(expectedResult))); 19    } 

使用订阅控制消息需求

假设只想消费第一个消息,应用一些逻辑并完成处理。可以使用request()方法来实现这一点。

修改下代码:

 1public class EndSubscriber<T> implements Flow.Subscriber<T> {   2    // 多少消息需要消费  3    private final AtomicInteger howMuchMessagesToConsume;  4    private Flow.Subscription subscription;  5    // 保存消费过的消息  6    public List<T> consumedElements = new LinkedList<>();  7  8    public EndSubscriber(Integer howMuchMessagesToConsume) {   9        this.howMuchMessagesToConsume = new AtomicInteger(howMuchMessagesToConsume); 10    } 11 12    @Override 13    public void onSubscribe(Flow.Subscription subscription) {  14        this.subscription = subscription; 15        subscription.request(1); 16    } 17 18    @Override 19    public void onNext(T item) {  20        howMuchMessagesToConsume.decrementAndGet(); // 减一 21        System.out.println("Got : " + item); 22        consumedElements.add(item); 23        if (howMuchMessagesToConsume.get() > 0) {  24            subscription.request(1); 25        } 26    } 27 28    @Override 29    public void onError(Throwable t) {  30        t.printStackTrace(); 31    } 32 33    @Override 34    public void onComplete() {  35        System.out.println("Done"); 36    } 37} 

测试

 1@Test  2public void whenRequestForOnlyOneElement_thenShouldConsumeOne(){   3    // given  4    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();  5    EndSubscriber<String> subscriber = new EndSubscriber<>(1);  6    publisher.subscribe(subscriber);  7    List<String> items = List.of("1", "x", "2", "x", "3", "x");  8    List<String> expected = List.of("1");  9 10    // when 11    assertEquals(publisher.getNumberOfSubscribers(),1); 12    items.forEach(publisher::submit); 13    publisher.close(); 14 15    // then 16    await().atMost(1000, TimeUnit.MILLISECONDS) 17            .untilAsserted(() -> 18                    assertTrue(subscriber.consumedElements.containsAll(expected)) 19            ); 20} 

尽管发布者发布了6个元素,但EndSubscriber将只使用一个元素,因为它表示只需要处理这一个元素。

通过在Subscription上使用request()方法,我们可以实现更复杂的回压机制来控制消息消费的速度。

copyright © 2025 powered by 益强资讯全景  滇ICP备2023006006号-31sitemap