The Publisher Subscriber pattern, also known as PubSub, is an architectural pattern for relaying messages to interested parties through a publisher. The publisher is not generally aware of the subscribers per say but in our implementation it will so that we can ease into the topic.
The PubSub pattern gives us a scalable way to relay messages around our applications but is inflexible in one area and that is the data structure that is sent to each subscriber when a new message is published. Generally this is a good thing though in my opinion since it allows a nice normalised way of transacting data through our applications.
Tests
For the tests I will be using JavaScript and the Jest test runner.
const Publisher = require('./publisher');
let publisher;
beforeEach(() => publisher = new Publisher);
describe("Publisher", () => {
it("Should construct with default values", () => {
expect(publisher.topic).toEqual("unknown");
expect(publisher.subscribers).toEqual([]);
});
it("Should add subscribers properly", () => {
const subscriber = jest.fn();
expect(publisher.subscribers.length).toEqual(0);
publisher.subscribe(subscriber);
expect(publisher.subscribers.length).toEqual(1);
});
it("Should publish updates to subscribers", () => {
const subscriber = jest.fn();
publisher.subscribe(subscriber);
publisher.publish("test");
expect(subscriber).toHaveBeenCalledWith({
topic: "unknown",
data: "test"
});
});
it("Should unsubscribe from updates as required", () => {
const subscriber = jest.fn();
const subscription = publisher.subscribe(subscriber);
publisher.publish("test");
expect(subscriber).toHaveBeenCalledTimes(1);
publisher.unsubscribe(subscription);
publisher.publish("test");
expect(subscriber).toHaveBeenCalledTimes(1);
});
it("Should not unsubscribe a subscriber from updates unless it exists", () => {
const subscriber = jest.fn();
publisher.subscribe(subscriber);
expect(publisher.subscribers.length).toEqual(1);
publisher.unsubscribe(() => 24);
expect(publisher.subscribers.length).toEqual(1);
});
it("Generates a consistent subscription id for each subscriber", () => {
const subscriber = jest.fn();
const subscription = publisher.subscribe(subscriber);
const proof = publisher.createSubscriptionId(subscriber);
expect(subscription).toEqual(proof);
});
});
Here we test that:
- We begin with sane defaults
- We can add subscribers
- We can notify subscribers
- We can remove subscribers
- We only remove subscribers when they exist
- We generate consistent ids for each subscriber that is provided
You can run the tests here:
This covers the bases required of a publisher and subscriber and gives us control over who does and does not get notifications when new content is published. Pretty simple so far, right?
Implementation
For our implementation I will be using TypeScript, a typed superset of JavaScript. If you are more comfortable with JavaScript you can compile TypeScript code to JavaScript in the TypeScript playground.
export interface ISubscriberOutput {
topic: string;
data: any;
};
export class Publisher {
public topic: string = "unknown";
private subscribers: Function[] = [];
public subscribe(subscriberFn: Function): number {
this.subscribers = [...this.subscribers, subscriberFn];
const subscriptionId = this.createSubscriptionId(subscriberFn);
return subscriptionId;
}
public publish(data: any): void {
this.subscribers.forEach((subscriberFn: Function) => {
const output: ISubscriberOutput = { topic: this.topic, data };
subscriberFn(output);
});
}
public unsubscribe(subscriptionId: number): void {
const subscriberFns = [...this.subscribers];
subscriberFns.forEach((subscriberFn: Function, index: number) => {
if(this.createSubscriptionId(subscriberFn) === subscriptionId) {
subscriberFns.splice(index, 1);
this.subscribers = [...subscriberFns];
}
});
}
private createSubscriptionId(subscriberFn: Function): number {
const encodeString = this.topic + subscriberFn.toString();
return [...encodeString].reduce((accumulator, char) => {
return char.charCodeAt(0) + ((accumulator << 5) - accumulator);
}, 0);
}
}
This class generates a Publisher with a set of methods for us to use for publishing updates, subscribing to those updates and also unsubscribing when the need arises. Let's break things down from top to bottom.
export interface ISubscriberOutput {
topic: string;
data: any;
};
This interface is able to be used by subscribers that will take in messages when the publish
method is called on the Publisher
and gives us the structured message output we discussed in the introduction of this article.
public topic: string = "unknown";
private subscribers: Function[] = [];
As we begin to define the Publisher
class, we first initialise the class with a topic of "unknown" since the topic hasn't been provided or overridden. We also have an array of subscribers
initialised, each of which should be a Function
.
Next we create the subscribe
method. This will add the provided subscriberFn
function to the subscribers
array and then return a subscriptionId
for us to use later should we choose to unsubscribe down the road.
public subscribe(subscriberFn: Function): number {
this.subscribers = [...this.subscribers, subscriberFn];
const subscriptionId = this.createSubscriptionId(subscriberFn);
return subscriptionId;
}
The createSubscriptionId
generates a unique ID for each subscriber and utilises the same algorithm as the the Java String hashCode() Method.
private createSubscriptionId(subscriberFn: Function): number {
const encodeString = this.topic + subscriberFn.toString();
return [...encodeString].reduce((accumulator, char) => {
return char.charCodeAt(0) + ((accumulator << 5) - accumulator);
}, 0);
}
In short we take the current topic
and add to that the string representation of the subscriberFn
. This gives us a somewhat unique string but is not bulletproof by any means. From here we take each character in the encodeString
and reduce it to a number representation unique to that string.
Sidenote: We use the
<<
operator which seems to confuse a lot of developers but in this case we are just doing this in essence:accumulator * (2 ** 5)
.So let's say it is the second iteration of the
reduce
loop and accumulator is at an arbitrary value of 65 which is the character code for lowercase "a".This being the case the
(accumulator << 5) - accumulator
line is the same as writing(65 * (2 ** 5)) - 65
during that iteration.Either way you write it, the resulting number will be 2015 for that example.
Hopefully that makes a bit more sense for you who may not have experienced using bitwise operators before.
I highly recommend reading the MDN bitwise reference for more information.
If we want to unsubscribe from a Publisher
at any time, you can simply call the unsubscribe
method passing in the return value of the original subscribe
call.
public unsubscribe(subscriptionId: number): void {
const subscriberFns = [...this.subscribers];
subscriberFns.forEach((subscriberFn: Function, index: number) => {
if(this.createSubscriptionId(subscriberFn) === subscriptionId) {
subscriberFns.splice(index, 1);
this.subscribers = [...subscriberFns];
}
});
}
Here we clone the current subscribers and loop over the clone until we find one that when it is hashed in the createSubscriptionId
function, matches the provided subscriptionId
value.
If we find a match then we remove that function from the subscriberFns
array and set the subscribers
to contain only the remaining subscriberFns
.
Lastly we will look at the publish
function which takes in some data
which can be anything you wish to broadcast to the subscribers
.
public publish(data: any): void {
this.subscribers.forEach((subscriberFn: Function) => {
const output: ISubscriberOutput = { topic: this.topic, data };
subscriberFn(output);
});
}
We loop over the current subscribers
and notify each one with an object matching the ISubscriberOutput
structure.
Overall this implementation keeps things concise and to the point.
Example usage
An example use case could be an article publisher which notifies subscribers when new articles get published. It could look like this for example:
Conclusions
I like this pattern and how it allows a scalable and predictable messaging format and how flexible it can be to the needs of what you are building.
I think this ties in nicely with other architectural patterns like the microservices pattern which uses event queues to pass information around in a way that is not too dissimilar to PubSub.
Hopefully you found some value in todays post and you can make use of this pattern in the future!
Top comments (4)
I like how you explained it - thanks for sharing!
You're welcome Tamimi, I'm glad you found value in the post, thanks for dropping by!
I'm recently getting more into pub/sub and event-driven development, I attempted to build an event-driven NodeJS application on covid data that I wrote a blog post about here if you want to check it out dev.to/tweettamimi/how-i-built-an-...! It's pretty cool and I'm planing to build more side projects with it
I'll be sure to check that out, thanks for the share.