A system delivers better throughput when various parts of it work fairly independently without blocking each other. Such a requirement demands asynchronous communication between consumers and providers. This third part of the ‘The Design Odyssey’ series explores a few of the GoF (Gang of Four) behavioural patterns that help in designing asynchronous communication at the object level. These patterns lay the foundation for popular messaging products like Apache Kafka and many others.
A consumer communicates its request to the provider and the provider, in turn, communicates the response to the consumer. Communication takes place in different forms.
Synchronous communication
In this mode of communication, the consumer sends a request to the provider and waits until it receives the response before proceeding further. In other words, the consumer gets blocked while the provider processes the request.
For example, in the following snippet, a consumer waits until the calculator returns the sum of two numbers.
calculator.add(10, 30);
Though there are valid cases for this mode of communication, as a side effect, it impacts the overall throughput of the system badly. What if the calculator takes an inordinate amount of time to respond? What if someone else is waiting for a response from the consumer object? As you can see, it can cascade further and the entire system may come to a standstill. For this reason, most modern systems avoid blocking communication between consumers and providers.
Callback
One of the time-tested mechanisms for establishing non-blocking communication between consumers and providers is the use of callbacks. The following snippet captures the essence of such a mechanism:
calculator.add(10, 30, sum -> { print(sum); stop(music); }); play(music);
The objective of the above code is to play some music while the provider computes the sum of the numbers, and to stop the music once the results are printed. As a pattern, the consumer wants to do something (like playing music in this case) while the provider is busy processing the request. This is impossible in synchronous communication.
The third parameter in the add() call above is famously known as a callback function. A callback function is always executed sometime in the future after the original request is completed. The provider that offers such a mechanism immediately returns the control back to the consumer after receiving the request. The consumer is free to do whatever it wants while the provider is busy with request processing. Once the provider completes processing the request, the callback function is executed. For all this to happen, the provider and the consumer run in separate threads or processes, and an underlying infrastructure handles the coordination.
Publish-Subscribe
The callback mechanism has a severe limitation, in its simplest form. It can handle only one-to-one communication between the provider and consumer. The consumers should be aware of who is the provider to make the request. When several providers and several consumers are involved, implementing a callback mechanism leads to the notorious callback hell.
A better alternative is to make the providers and consumers completely independent of each other. In other words, let the consumer only publish its request on a designated channel, instead of reaching out to a specific provider. On the other side, let the providers subscribe to the channel, process the requests, and publish the responses on a designated channel. The consumers are expected to subscribe to the responses from that channel.
The beauty of this mechanism lies in the fact that the providers and consumers are not even aware of each other. They only know the channels, not the publishers and subscribers. And, no one is blocking others. There can be any number of publishers and any number of subscribers. Moreover, there is no limit on the number of channels, number of requests, and number of responses. Since the requests and responses are decoupled, they are referred to as just messages. A message may carry a request, another message may carry a notification and some other message may carry a response.
The infrastructure that manages the channels, subscriptions, message delivery, etc, is the key in this mechanism. It is modelled along the lines of the following three behavioural patterns.
Mediator: In this pattern, a mediator acts as the infrastructure that connects the publishers with the subscribers. Several subscribers subscribe with the mediator for receiving the messages. Several publishers publish messages to the mediator. The mediator delivers the message to the subscribers. The mediator can also be equipped with some processing logic in terms of filtering the messages, routing the messages to a specific group of subscribers, etc. In some cases, a domain-specific mediator may also have business logic. Usually, an entity may act both as a publisher as well as a subscriber for different kinds of messages.
Observer: In this pattern, the subscribers register with a subject for receiving messages from specific sources. Often the sources are referred to as observables, messages are referred to as events, and subscribers are referred to as observers, in this pattern. The subject keeps the observers informed of the events from the observables.
In both the above patterns, the middle layer (mediator or subject) acts as the infrastructure through which the events or messages are delivered from the publishers or observables to the subscribers or observers.
Command: Though not specifically designed for this purpose, the command is essentially a callback function in the form of an object. In both mediator and observer patterns, it is the command object that processes a message or an event. The command object and its handling function are expected to be self-sufficient in such a way that they have all the information required for processing.
Messaging infrastructure
Having seen the basics of messaging patterns, it’s time to build infrastructure for enabling asynchronous communication among the objects. Such an infrastructure is application-agnostic. Any application that requires asynchronous communication should be able to use our messaging infrastructure.
Here are the simple requirements:
- Objects should be able to register with the infrastructure for receiving messages from anyone.
- Objects should be able to register only for certain types of messages.
- Objects should be able to publish messages.
Implementation
Let’s start with an interface that offers functions to subscribe, unsubscribe and publish:
package com.glarimy.framework; public interface Broker { public long subscribe(Subscriber subscriber); public void unsubscribe(long sid); public void publish(String message, String type); }
The idea is that once a subscriber is registered with the Broker, it returns a unique ID that can be used for unsubscribing in the future. Also, the publish() method accepts both the message as well as the type of the message.
The Subscriber is an interface with only one method that is meant for handling the received message.
package com.glarimy.framework; public interface Subscriber { public void on(String message, String type); }
Different applications may have different implementations for the Subscriber interface. Since the type of the message is also passed as a parameter, the on() method can have a switch statement to deal with messages based on the type.
The simplest possible implementation of the Broker can store all the registered subscribers in memory.
package com.glarimy.framework; import java.util.HashMap; import java.util.Map; public class MessageBroker implements Broker { private Map<Long, Subscriber> subscribers; public MessageBroker() { subscribers = new HashMap<Long, Subscriber>(); } ... }
The subscribe() method generates a unique ID before storing the subscriber in its list. A rudimentary implementation can use the current time as the ID, though it is not the best option for obvious reasons.
@Override public long subscribe(Subscriber subscriber) { long sid = new Date().getTime(); subscribers.put(sid, subscriber); return sid; }
The unsubscribe() method requires no explanation.
@Override public void unsubscribe(long sid) { subscribers.remove(sid); }
And the publish() method iterates through all the subscribers and calls the on() method on each one of them.
@Override public void publish(String message, String type) { subscribers.values().stream().forEach(s -> s.on(message, type)); }
Note that this implementation does not give the non-blocking flavour of asynchronous communication. It only decouples the consumers and the providers. In order for real non-blocking communication, the publish() method can make use of a thread pool and call the on() method in separate threads.
This setup is depicted in Figure 1.
One semantic limitation of the above design is that a subscriber receives all the messages and uses a switch statement to handle them. This does not help us in improving the modularity of the application. When a new type of message is to be handled, the on() method of the subscriber needs to be modified with an additional case in the switch. Similar modifications are expected also when a type of message is to be ignored or handled differently.
Instead, an interface like Handler in the following snippet can be designed only to handle one type of message.
package com.glarimy.framework; public interface Handler { public void handle(String message); }
As can be observed, the handler just receives a message that can be handled without a switch statement. In other words, a given application can have different handler implementations for different types of messages.
Accordingly, the broker is refactored to register handlers, instead of subscribers.
package com.glarimy.framework; public interface Broker { public long register(Handler handler); public void unregister(long hid); public void publish(String message, String type); }
The broker implementation maintains a list of handlers and delivers the messages only to the matching handlers based on the type of message published. In order to identify the type of message that a handler is interested in, the interface can be updated as follows:
package com.glarimy.framework; public interface Handler { public void handle(String message); public String getType(); }
The refactored implementation of the broker maintains the list of handlers.
package com.glarimy.framework; import java.util.Date; import java.util.HashMap; import java.util.Map; public class MessageBroker implements Broker { private Map<Long, Handler> handlers; public MessageBroker() { handlers = new HashMap<Long, Handler>(); } ... }
And the registration and removal of the handlers are on the expected lines.
@Override public long register(Handler handler) { long hid = new Date().getTime(); handlers.put(hid, handler); return hid; } @Override public void unregister(long sid) { handlers.remove(sid); }
However, the publish() method requires small modifications. Earlier, all the messages were delivered to all the subscribers. That has to be changed. It must only deliver messages to the handlers that are interested in the type of the message. Accordingly, the following is the updated publish() method.
@Override public void publish(String message, String type) { handlers.values().stream() .filter(h -> h.getType().equalsIgnoreCase(type)) .forEach(s -> s.handle(message)); }
Figure 2 depicts the updated broker design.
We will revisit the design of the message broker in the future for a deeper treatment. Since our current focus is on the fundamental object-oriented patterns, the MessageBroker in the current form is just enough to demonstrate the behavioural patterns.
UMS with messaging
The existing user management system (UMS) developed so far is intrinsically synchronous in nature. The consumers and providers are aware of each other. With the MessageBroker at its disposal, the UMS can be upgraded to use asynchronous communication.
Instead of the UserRepositoryJournalProxy directly calling the JournalAdapter, we can decouple them by using the broker. Let the UserRepositoryJournalProxy send messages to the broker and thereby the JournalHandler processes the messages. To connect these two virtually, the messages can be typed as com.glarimy.ums.
Following is the implementation of JournalHandler:
package com.glarimy.ums; import com.glarimy.framework.Factory; import com.glarimy.framework.Handler; import com.glarimy.framework.ObjectFactory; public class JournalHandler implements Handler { private JournalAdapter adapter; public JournalHandler() throws Exception { Factory factory = new ObjectFactory(“config.properties”); this.adapter = (JournalAdapter) factory.get(“adapter”); } @Override public void handle(String message) { adapter.record(message); } @Override public String getType() { return “com.glarimy.ums”; } }
The corresponding implementation of UserRepositoryJournalProxy is:
package com.glarimy.ums; import com.glarimy.framework.Broker; import com.glarimy.framework.Factory; import com.glarimy.framework.ObjectFactory; public class UserRepositoryJournalProxy implements UserRepository { private UserRepository target; private Broker broker; private static final String TOPIC = “com.glarimy.ums”; public UserRepositoryJournalProxy(UserRepository target) throws Exception { this.target = target; Factory factory = new ObjectFactory(“config.properties”); this.broker = (Broker) factory.get(“broker”); this.broker.register(new JournalHandler()); } ... }
Being a proxy, the UserRepositoryJournalProxy publishes messages to the broker as part of its pre- and post-processing routines.
@Override public void add(Long phone, String name) { broker.publish(“c=repository, s=add, a=” + “&” + name, TOPIC); target.add(phone, name); broker.publish(“c=repository, s=add, r=void”, TOPIC); } @Override public String find(Long phone) { broker.publish(“c=repository, s=find, a=” + phone, TOPIC); String name = target.find(phone); broker.publish(“c=repository, s=find, r=” + name, TOPIC); return name; }
The updated design of the UMS is depicted in Figure 3.
An important observation is that the UserRepositoryJournalProxy and the JournalAdapter are now completely decoupled. In fact, in response to the messages published by the UserRepositoryJournalProxy, any number of handlers on the topic com.glarimy.ums can respond by their own processing. For example, a handler may send an email and another handler may send an SMS confirming the addition of a user. Registering or unregistering such handlers does not have any bearing on the rest of the system.
If the broker invokes the handlers in separate threads, this implementation realises a complete non-blocking asynchronous communication.
With this, we leave the object-oriented design using GoF patterns. In the next part, we will explore domain-driven design.