Implementing Reactive Streams Specification Interfaces

Jayamal Jayamaha
3 min readMar 22, 2022

Recently I got a chance to learn and use reactive programming in java. I used the spring boot web flux reactive library which uses a project reactor under the hood. basically, the project reactor is a library that has classes that implemented the reactive streams specifications (the actual library is more complex than that, but the base of that library is that). So after I learn that I thought to create a custom java project that implements reactive streams specification interfaces from my own.

If you need to know more about reactive programming java you can refer to that from here.

Custom Project

Here is the custom java project that I created. you can see and use them from Github.

Here I have created 3 classes

CustomPublisher.java which implement the Publisher interface

CustomSubscriber.java which implements the Subscriber interface

CustomSubscription.java which implements the Subscription interface

In addition to that, I have created 3 more java classes

Data.java is the object that will be published by the publisher to the subscriber

UnexpectedCancelOperationException.java class extends the RuntimeException class which will be thrown from publisher logic when the subscriber cancels the subscription.

Driver.java class which runs the control flow of the program.

Program Flow

  • Create the custom subscriber object in the driver class with a consumer and the number of data that need to get from the publisher. In here consumer is a functional interface where we define the way that we need to consume data from the publisher. In this program, we used console print in consumer
  • Create the custom publisher object in the driver class
  • Create the custom subscription object in the driver class by passing the custom subscriber and custom publisher object that we created earlier
  • Call onSubscribe method in subscriber object by passing the custom subscription object.
  • Call the request method of the custom subscription object with the number of data in onSubscribe method in the custom subscriber object that we call earlier.
  • Call doSubscribe method of the custom publisher object by passing the number of data and custom subscriber object in request method of the custom subscription object.
  • In that doSubscribe method set the data count and isCanceled to false and Call subscribe method in the same custom publisher object by passing the custom subscriber object.
  • In that subscribe method the data will be published in an asynchronous way. the CompletableFuture object is used to achieve that concurrency in java.
  • When publishing data, a loop was used to send the number of data that we passed earlier, and when sending, onNext method of the custom subscriber object will be called inside that loop. if isCanceled variable is true then UnexpectedCancelOperationException will be thrown by breaking the loop. finally, outside the loop, UnexpectedCancelOperationException will be caught by catch block and onError method of the custom subscriber object will be called. if all the data will be sent successfully the onComplete method of the custom subscriber object will be called.
  • All above data publishing process will be happened inside a different thread asynchronously.
  • In onNext method of the custom subscriber will get the data from the paramter and pass that data to the consumer which we passed in driver class.

above is the basic flow of this program. This is a very simple implementation of reactive stream, we can add more complex functionalities in further

--

--