Implementing Reactive Streams Specification Interfaces
Recently I got a chance to learn and use reactive programming in java. I used the spring boot web flux reactive library which uses a project reactor under the hood. basically, the project reactor is a library that has classes that implemented the reactive streams specifications (the actual library is more complex than that, but the base of that library is that). So after I learn that I thought to create a custom java project that implements reactive streams specification interfaces from my own.
If you need to know more about reactive programming java you can refer to that from here.
Reactive Programming in Java
Reactive programming is a new paradigm that was introduced recently. Nowadays reactive programming has become a very…
Here is the custom java project that I created. you can see and use them from Github.
GitHub - jayamaljayamaha/custom-reactive-stream
You can't perform that action at this time. You signed in with another tab or window. You signed out in another tab or…
Here I have created 3 classes
CustomPublisher.java which implement the Publisher interface
CustomSubscriber.java which implements the Subscriber interface
CustomSubscription.java which implements the Subscription interface
In addition to that, I have created 3 more java classes
Data.java is the object that will be published by the publisher to the subscriber
UnexpectedCancelOperationException.java class extends the RuntimeException class which will be thrown from publisher logic when the subscriber cancels the subscription.
Driver.java class which runs the control flow of the program.
- Create the custom subscriber object in the driver class with a consumer and the number of data that need to get from the publisher. In here consumer is a functional interface where we define the way that we need to consume data from the publisher. In this program, we used console print in consumer
- Create the custom publisher object in the driver class
- Create the custom subscription object in the driver class by passing the custom subscriber and custom publisher object that we created earlier
- Call onSubscribe method in subscriber object by passing the custom subscription object.
- Call the request method of the custom subscription object with the number of data in onSubscribe method in the custom subscriber object that we call earlier.
- Call doSubscribe method of the custom publisher object by passing the number of data and custom subscriber object in request method of the custom subscription object.
- In that doSubscribe method set the data count and isCanceled to false and Call subscribe method in the same custom publisher object by passing the custom subscriber object.
- In that subscribe method the data will be published in an asynchronous way. the CompletableFuture object is used to achieve that concurrency in java.
- When publishing data, a loop was used to send the number of data that we passed earlier, and when sending, onNext method of the custom subscriber object will be called inside that loop. if isCanceled variable is true then UnexpectedCancelOperationException will be thrown by breaking the loop. finally, outside the loop, UnexpectedCancelOperationException will be caught by catch block and onError method of the custom subscriber object will be called. if all the data will be sent successfully the onComplete method of the custom subscriber object will be called.
- All above data publishing process will be happened inside a different thread asynchronously.
- In onNext method of the custom subscriber will get the data from the paramter and pass that data to the consumer which we passed in driver class.
above is the basic flow of this program. This is a very simple implementation of reactive stream, we can add more complex functionalities in further