在本文中,异步主要研究下Java9+中的编程反应流。简单地说,式流使用就是异步使用Flow类,Flow类包含了用于构建响应式流处理的编程主要模块。
这里说的式流使用反应流其实是一种非阻塞式背压的异步流处理标准(有点绕口)。
该规范在响应式宣言中定义,异步并且有各种各样的编程实现,例如RxJava或Akka-Streams。式流使用
要构建一个流,异步主要使用三个抽象,编程并将它们组合成异步处理逻辑。式流使用
每个流都需要处理由Publisher实例发布给它的异步事件;发布者有一个subscribe()的方法。
如果某个订阅者希望接收发布者发布的编程事件,则需要使用subscribe()订阅发布者。式流使用
消息的接收方需要实现订阅者接口。一般情况下,接受者是每个Flow处理的结束,因为它的实例不会进一步发送消息。
可以将Subscriber看作Sink。香港云服务器有四个方法需要重写onSubscribe(), onNext(), onError()和onComplete()。
如果希望转换传入的消息并将其进一步传递给下一个订阅服务,则需要实现Processor接口。
它既充当订阅服务(因为它接收消息),又充当发布服务(因为它处理这些消息并将它们发送以进行进一步处理)。
假设想要创建一个简单的流,其中有一个发布者发布消息,一个简单的订阅者在消息到达时使用消息。
先创建一个EndSubscriber类。需要实现订阅服务接口。接下来,重写所需的方法。
onSubscribe()方法在处理开始之前被调用。
订阅的实例subscription作为参数传递。Subscription是控制订阅服务和发布服务之间的消息流的类.
1public class EndSubscriber<T> implements Subscriber<T> { 2 // 多少消息需要消费 3 private final AtomicInteger howMuchMessagesToConsume; 4 private Flow.Subscription subscription; 5 // 保存消费过的消息 6 public List<T> consumedElements = new LinkedList<>(); 7 8 public EndSubscriber(Integer howMuchMessagesToConsume) { 9 this.howMuchMessagesToConsume = new AtomicInteger(howMuchMessagesToConsume); 10 } 11 12 @Override 13 public void onSubscribe(Flow.Subscription subscription) { 14 this.subscription = subscription; 15 subscription.request(1); 16 } 17}在这里还初始化了一个在测试中使用的消耗元素的空列表。
现在,需要从订阅者接口实现其余的方法。这里的主要方法是站群服务器onNext(),它在发布者发布新消息时被调用
1@Override 2public void onNext(T item) { 3 System.out.println("Got : " + item); 4 consumedElements.add(item); 5 subscription.request(1); 6}这里需要注意的的是,当在onSubscribe()方法中启动开始订阅时,以及当处理消息时onNext(),需要调用subscription上的request()方法来通知当前订阅器准备使用更多消息。
最后,需要实现onError(),它会在处理过程中抛出异常时被调用.
在发布者关闭时调用onComplete().
1@Override 2public void onError(Throwable t) { 3 t.printStackTrace(); 4} 5 6@Override 7public void onComplete() { 8 System.out.println("Done"); 9}接下来为这个处理流编写一个测试。将使用SubmissionPublisher类,这是java.util.concurrent中的一个类,它实现了Publisher接口。
测试中向发布者提交N个元素,我们的终端订阅者会接收到这些元素。
1@Test 2public void whenSubscribeToIt_thenShouldConsumeAll() 3 throws InterruptedException { 4 5 // given 6 SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 7 EndSubscriber<String> subscriber = new EndSubscriber<>(); 8 publisher.subscribe(subscriber); 9 List<String> items = List.of("1", "x", "2", "x", "3", "x"); 10 11 // when 12 assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); 13 items.forEach(publisher::submit); 14 publisher.close(); 15 16 // then 17 await().atMost(1000, TimeUnit.MILLISECONDS) 18 .until( 19 () -> assertThat(subscriber.consumedElements) 20 .containsExactlyElementsOf(items) 21 ); 22}注意,在publisher实例上调用close()方法。它将在每个订阅者上调用onComplete()。
程序输出如下:
1Got : 1 2Got : x 3Got : 2 4Got : x 5Got : 3 6Got : x 7Done假设还希望在发布者和订阅者之间做一些数据的转换。
下面我创建一个TransformProcessor类,它实现了Processor并扩展了SubmissionPublisher,因为它同时包含Publisher和Subscriber。
并且将传入一个Function将输入转换到输出。
1import java.util.concurrent.Flow; 2import java.util.concurrent.SubmissionPublisher; 3import java.util.function.Function; 4 5public class TransformProcessor<T,R> extends SubmissionPublisher<R> implements Flow.Processor<T,R> { 6 private Function<T,R> function; 7 private Flow.Subscription subscription; 8 9 public TransformProcessor(Function<T, R> function) { 10 super(); 11 this.function = function; 12 } 13 14 @Override 15 public void onSubscribe(Flow.Subscription subscription) { 16 this.subscription = subscription; 17 subscription.request(1); 18 } 19 20 @Override 21 public void onNext(T item) { 22 submit(function.apply(item)); 23 subscription.request(1); 24 } 25 26 @Override 27 public void onError(Throwable t) { 28 t.printStackTrace(); 29 } 30 31 @Override 32 public void onComplete() { 33 close(); 34 } 35}这里的TransformProcessor将把String转换为两个String,看下面我写的测试用例。网站模板
1 @Test 2 public void whenSubscribeAndTransformElements_thenShouldConsumeAll() { 3 // given 4 SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 5 Function<String, String> dup = x -> x.concat(x); 6 TransformProcessor<String, String> transformProcessor 7 = new TransformProcessor<>(dup); 8 EndSubscriber<String> subscriber = new EndSubscriber<>(6); 9 List<String> items = List.of("1", "2", "3"); 10 List<String> expectedResult = List.of("11", "22", "33"); 11 // when 12 publisher.subscribe(transformProcessor); 13 transformProcessor.subscribe(subscriber); 14 items.forEach(publisher::submit); 15 publisher.close(); 16 17 await().atMost(1000, TimeUnit.MILLISECONDS) 18 .untilAsserted(() -> assertTrue(subscriber.consumedElements.containsAll(expectedResult))); 19 }假设只想消费第一个消息,应用一些逻辑并完成处理。可以使用request()方法来实现这一点。
修改下代码:
1public class EndSubscriber<T> implements Flow.Subscriber<T> { 2 // 多少消息需要消费 3 private final AtomicInteger howMuchMessagesToConsume; 4 private Flow.Subscription subscription; 5 // 保存消费过的消息 6 public List<T> consumedElements = new LinkedList<>(); 7 8 public EndSubscriber(Integer howMuchMessagesToConsume) { 9 this.howMuchMessagesToConsume = new AtomicInteger(howMuchMessagesToConsume); 10 } 11 12 @Override 13 public void onSubscribe(Flow.Subscription subscription) { 14 this.subscription = subscription; 15 subscription.request(1); 16 } 17 18 @Override 19 public void onNext(T item) { 20 howMuchMessagesToConsume.decrementAndGet(); // 减一 21 System.out.println("Got : " + item); 22 consumedElements.add(item); 23 if (howMuchMessagesToConsume.get() > 0) { 24 subscription.request(1); 25 } 26 } 27 28 @Override 29 public void onError(Throwable t) { 30 t.printStackTrace(); 31 } 32 33 @Override 34 public void onComplete() { 35 System.out.println("Done"); 36 } 37}测试
1@Test 2public void whenRequestForOnlyOneElement_thenShouldConsumeOne(){ 3 // given 4 SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 5 EndSubscriber<String> subscriber = new EndSubscriber<>(1); 6 publisher.subscribe(subscriber); 7 List<String> items = List.of("1", "x", "2", "x", "3", "x"); 8 List<String> expected = List.of("1"); 9 10 // when 11 assertEquals(publisher.getNumberOfSubscribers(),1); 12 items.forEach(publisher::submit); 13 publisher.close(); 14 15 // then 16 await().atMost(1000, TimeUnit.MILLISECONDS) 17 .untilAsserted(() -> 18 assertTrue(subscriber.consumedElements.containsAll(expected)) 19 ); 20}尽管发布者发布了6个元素,但EndSubscriber将只使用一个元素,因为它表示只需要处理这一个元素。
通过在Subscription上使用request()方法,我们可以实现更复杂的回压机制来控制消息消费的速度。