Original article was published on my website: https://promisefemi.vercel.app/blog/grpc-client-connection-poolin
Recently I was working on a daemon for consuming MQTT messages sent from thousands of IOT devices. One of the requirements was that the daemon needed to be able to handle about 15k messages per second (I know right ๐ฎ ).
Without going into too much detail (that would be for another blog) we decided to cut the round trip of our MySQL database completely and introduce a gRPC server that would act as a buffer between our daemon and the database. By doing that our daemon would simply drop messages off to the gRPC server and the server would queue and write to the database on its own time.
Building the gRPC server was the easy part the hard part is managing the needs of the daemons, a number of goroutines would need to send messages to the gRPC server, but using the same client connection would significantly affect performance since new requests would have to wait for the connection to finish whatever task is currently being handled, just imagine 14K+ messages waiting for one connection to become free. At this point it was clear using one connection was a no-go and creating new connections is out of the picture (creating new connections is very expensive here, so we decided to use a connection pool.
A lot of the ideas here were inspired by This Blog, i strongly recommend it.
If you donโt know what a connection pool is I strongly advise that you read the blog I linked above.
Jumping In
If you donโt want to go through everything, i have already created a go package that can be used right away. https://github.com/promisefemi/grpc-client-pool
Before anything else, let's talk about how to create new gRPC client connections
conn, err := grpc.Dial(":9000", grpc.WithTransportCredentials(insecure.NewCredential()), grpc.WithBlock())
if err != nil{
// Handle error
}
// create a new client and use connection
or with context
ctx, cancelCtx := context.WithTimeout(context.Background(), 3 * time.Second)
conn, err := grpc.DialContext(ctx,":9000", grpc.WithTransportCredentials(insecure.NewCredential()), grpc.WithBlock())
if err != nil{
// Handle error
}
// create a new client and use connection
Pool Structure
At its core, a connection pool contains three important properties maxOpenConnection
, maxIdleConnection
and idleConnections
maxOpenConnection - indicates how many connections we are allowed to open before we have to start queue connection requests.
maxIdleConnection - how many connections are we allowed to keep before we start closing connections?
idleConnections - a list of connections that can be reused.
There are also two important methodsGet
and put
these methods are responsible for getting and caching connections. Altogether these properties and methods are responsible for managing our connection pool.
type ClientPool struct {
mu sync.Mutex //mutex to protect the idleConnections and numOfOpenConnection properties
address string // gRPC server address
configOptions []grpc.DialOption // gRPC dial configurations
maxOpenConnection int // maximum number of open connections allowed
maxIdleConnection int // maximum number of idle connections allwed
idleConnections map[string]*ClientCon // list of idle connections
numOfOpenConnection int // number of currently open connections
}
ClientCon is a custom type, so let's create that as well
type ClientCon struct {
id string
pool *ClientPool
Conn *grpc.ClientConn
}
Now let us proceed with our put
and get
methods
Put method
The responsibility of the put method is very simple, it simply takes a connection and returns it back to the list of idle connections (the pool) or closes it if there is no room.
func (cp *ClientPool) put(conn *ClientCon) {
//Lock the pool and defer the unlock
cp.mu.Lock()
defer cp.mu.Unlock()
// check to see if we can return the connection back to the idleConnection
if cp.maxIdleConnection >= len(cp.idleConnections) {
cp.idleConnections[conn.id] = conn
} else {
// if the number of idle connections are more than allowed, close the connection
cp.numOfOpenConnection--
_ = conn.Conn.Close()
}
}
and thatโs about it, the put method is done.
Get method
The get method is a little bit different, the get method has to handle three different scenarios:
Return a connection from the list of idle connections if there is any available
Queue connection request if we have surpassed the number of connections we can create
Create a new connection
Let's see how this translates into code.
func (cp *ClientPool) Get() (*ClientCon, error) {
//Lock the pool
cp.mu.Lock()
// check if we have any connection available
if len(cp.idleConnections) > 0 {
// return the first connection available, and remove from the list of idleConnection
for _, val := range cp.idleConnections {
delete(cp.idleConnections, val.id)
// increment the number of open connections
cp.numOfOpenConnection++
cp.mu.Unlock()
return val, nil
}
}
...
For the first step, we check if we have any idle connections and if we do return the first connection, and increment the list of open connections.
If there are no available connections we move to the next step.
Before we start creating new connections, its important to know if we have not surpassed the number of open connections we are allowed to create, and if we have we would need to queue the connection request and resolve it at a later time.
Go channels are coming to the rescue here. But before we do that we need to introduce a new type thatโs going to act as the channel type.
The queueChan
struct contains two properties a, this channel is used to receive a ClientCon
request once it is fulfilled and an errorChan
a channel that receives an error if a connection request could not be fulfilled.
// A queue struct ( The struct is not a channel i just named it that way)
type queueChan struct {
connectionChan chan *ClientCon
errorChan chan error
}
and let's add an update to the ClientPool
to contain the channel that would act as the queue.
type ClientPool struct {
...
connectionQueue chan *queueChan // channel for queuing connection requests when there are no idle connections left.
}
Back in the get method, we need to create a new queue request for a connection, pass that request to the pool connection queue then use a select to block until the request is resolved
...
// Check if we are allowed to open new connections
if cp.maxOpenConnection > 0 && cp.numOfOpenConnection >= cp.maxOpenConnection {
// create a new queue request
queueRequest := &queueChan{
connectionChan: make(chan *ClientCon),
errorChan: make(chan error),
}
// pass request into connection queue
cp.connectionQueue <- queueRequest
// block with a select until the request is fuffiled or an error occurs instead
select {
case conn := <-queueRequest.connectionChan:
cp.numOfOpenConnection++
cp.mu.Unlock()
return conn, nil
case err := <-queueRequest.errorChan:
cp.mu.Unlock()
return nil, err
}
}
...
With that we have successfully queued the connection request and when an idle connection is available the request would be resolved. We would come back to how we intend on resolving requests from the connection queue.
The last part of the get method is very simple, at this point, there is no idle connection and we can create a new connection (instead of queuing it).
conn, err := cp.openConnection()
if err != nil {
return nil, err
}
cp.numOfOpenConnection++
cp.mu.Unlock()
return conn, nil
}
func (cp *ClientPool) openConnection() (*ClientCon, error) {
// Dial a new grpc client connection or use DialContext if that works for you
newConn, err := grpc.Dial(cp.address, cp.configOptions...)
if err != nil {
return nil, err
}
// create a new ClientCon
return &ClientCon{
id: fmt.Sprintf("%v", time.Now().Unix()),
pool: cp,
Conn: newConn,
}, nil
}
and all together
// A queue struct ( The struct is not a channel i just named it that way)
type queueChan struct {
connectionChan chan *ClientCon
errorChan chan error
}
func (cp *ClientPool) Get() (*ClientCon, error) {
//Lock the pool
cp.mu.Lock()
// check if we have any connection available
if len(cp.idleConnections) > 0 {
// return the first connection available, and remove from the list of idleConnection
for _, val := range cp.idleConnections {
delete(cp.idleConnections, val.id)
// increment the number of open connections
cp.numOfOpenConnection++
cp.mu.Unlock()
return val, nil
}
}
// Check if we are allowed to open new connections
if cp.maxOpenConnection > 0 && cp.numOfOpenConnection >= cp.maxOpenConnection {
// create a new queue request
queueRequest := &queueChan{
connectionChan: make(chan *ClientCon),
errorChan: make(chan error),
}
// pass request into connection queue
cp.connectionQueue <- queueRequest
// block with a select until the request is fuffiled or an error occurs instead
select {
case conn := <-queueRequest.connectionChan:
cp.numOfOpenConnection++
cp.mu.Unlock()
return conn, nil
case err := <-queueRequest.errorChan:
cp.mu.Unlock()
return nil, err
}
}
conn, err := cp.openConnection()
if err != nil {
return nil, err
}
cp.numOfOpenConnection++
cp.mu.Unlock()
return conn, nil
}
func (cp *ClientPool) openConnection() (*ClientCon, error) {
// Dial a new grpc client connection or use DialContext if that works for you
newConn, err := grpc.Dial(cp.address, cp.configOptions...)
if err != nil {
return nil, err
}
// create a new Clientcon
return &ClientCon{
id: fmt.Sprintf("%v", time.Now().Unix()),
pool: cp,
Conn: newConn,
}, nil
}
And with all that we are almost done. Remember when we queued connection requests? well it's time to resolve those requests.
To do that we need to add a method that would run on a separate goroutine, constantly waiting for queued connection requests and the chance to resolve them. let's call this method handleConnectionQueue
Handling Connection Queue
There are three basic things we need to care about when handling connection queue requests:
Watching for new requests in the queue
Checking for a newly idle connection
Creating a connection if there are no idle connections and if we are allowed to do so.
As you may have noticed there are a lot of similarities with the Get
method, but something to take note of is that we are not returning an error immediately after a connection request fails instead we will continue to retry for a given time frame before deciding that the request failed.
Let us begin
func (cp *ClientPool) handleConnectionQueue() {
for rq := range cp.connectionQueue {
var (
hasTimedOut = false
hasCompleted = false
timeout = time.After(time.Duration(3) * time.Second)
)
//continually try to get/create a connection until timeout or connection completed
for {
if hasCompleted || hasTimedOut {
break
}
//continually check for timeout or try to get/create a connection
select {
case <-timeout:
hasTimedOut = true
rq.errorChan <- ErrConnectionWaitTimeout
default:
...
And within the default:
we would be doing two things, trying to get an idle connection or trying to create a new connection
...
// first check if an idle connection is available
cp.mu.Lock()
numberOfIdleConnections := len(cp.idleConnections)
if numberOfIdleConnections > 0 {
for _, val := range cp.idleConnections {
delete(cp.idleConnections, val.id)
cp.mu.Unlock()
rq.connectionChan <- val
hasCompleted = true
break
}
} else if cp.maxOpenConnection > 0 && cp.maxOpenConnection > cp.numOfOpenConnection {
//check if pool has not exceeded number of allowed open connections
// increase numberOfConnection hoping new connection would be created
// unlock mutext to free up resources for other connection, since creating new connections could take a while
cp.numOfOpenConnection++
cp.mu.Unlock()
conn, err := cp.openConnection()
//ignoring error because the only error we care about is the timeout, we continue to retry
cp.mu.Lock()
cp.numOfOpenConnection--
cp.mu.Unlock()
if err == nil {
rq.connectionChan <- conn
hasCompleted = true
}
} else {
//unlock pool and restart
cp.mu.Unlock()
}
}
}
}
}
All together
func (cp *ClientPool) handleConnectionQueue() {
for rq := range cp.connectionQueue {
var (
hasTimedOut = false
hasCompleted = false
timeout = time.After(time.Duration(3) * time.Second)
)
//continually try to get/create a connection until timeout or connection completed
for {
if hasCompleted || hasTimedOut {
break
}
//continually check for timeout or try to get/create a connection
select {
case <-timeout:
hasTimedOut = true
rq.errorChan <- ErrConnectionWaitTimeout
default:// first check if a idle connection is available
cp.mu.Lock()
numberOfIdleConnections := len(cp.idleConnections)
if numberOfIdleConnections > 0 {
for _, val := range cp.idleConnections {
delete(cp.idleConnections, val.id)
cp.mu.Unlock()
rq.connectionChan <- val
hasCompleted = true
break
}
} else if cp.maxOpenConnection > 0 && cp.maxOpenConnection > cp.numOfOpenConnection {
//check if pool has not exceeded number of allowed open connections
// increase numberOfConnection hoping new connection would be created
// unlock mutext to free up resources for other connection, since creating new connections could take a while
cp.numOfOpenConnection++
cp.mu.Unlock()
conn, err := cp.openConnection()
//ignoring error because the only error we care about is the timeout, we continue to retry
cp.mu.Lock()
cp.numOfOpenConnection--
cp.mu.Unlock()
if err == nil {
rq.connectionChan <- conn
hasCompleted = true
}
} else {
//unlock pool and restart
cp.mu.Unlock()
}
}
}
}
}
That is most of what we need to build a functional pool.
We still need to write a method to release a connection when you're done using it, for ease of usage we can bind that method to the ClientCon
. All the method does is call the ClientPool
put
method, as i have stated above all the put method does is return a connection back to the list of idle connections or close it completely.
func (c *ClientCon) Release() {
c.pool.put(c)
}
And for the last part, we want to write a function that helps us create a new pool, configure it and start the handleConnectionQueue
goroutine
type PoolConfig struct {
MaxOpenConnection int // number of open Connections
MaxIdleConnection int // number of idle connections
ConnectionQueueLength int //buffer capacity of the queue channel
Address string // gRPC server address
ConfigOptions []grpc.DialOption // gRPC connection options
}
func NewClientPool(config *PoolConfig) *ClientPool {
clientPool := &ClientPool{
mu: sync.Mutex{},
address: config.Address,
configOptions: config.ConfigOptions,
maxOpenConnection: config.MaxOpenConnection,
maxIdleConnection: config.MaxOpenConnection,
numOfOpenConnection: 0,
connectionQueue: make(chan *queueChan, config.ConnectionQueueLength),
idleConnections: make(map[string]*ClientCon, 0),
}
// start goroutine
go clientPool.handleConnectionQueue()
return clientPool
}
For real now, that's it we have successfully created a gRPC client connection pool, to create a new pool we would simply
...
//Create a new pool config
poolConfig := &pool.PoolConfig{
MaxOpenConnection: 10,
MaxIdleConnection: 10,
ConnectionQueueLength: 10000,
Address: ":9000",
ConfigOptions: []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
},
}
// create a new client pool
connPool := pool.NewClientPool(poolConfig)
//get a new ClientConn
client, err := connPool.Get()
if err != nil {
log.Fatalf("%s", err)
}
// Setup a new grpc client
// send a request using the conn
userMessage := &proto.UserMessage{
FirstName: "Promise",
LastName: "Femi",
Email: "",
}
uc := proto.NewUserClient(client.Conn)
response, err := uc.Set(context.Background(), userMessage)
if err != nil {
fmt.Printf("error unable to set user -- %s -- %d\n", err, connPool.GetNumberOfOpenConnections())
} else {
fmt.Printf("%+v -- %d \n", response, connPool.GetNumberOfOpenConnections())
}
// remember to release connections.
client.Release()
We are at the end of a great journey ๐๐พ. Thank you for reading my article, and you can reach out to me if you have any feedback โ๐พ.
I have already created a package that implements all the features outlined in this article @ https://github.com/promise/grpc-client-pool
Top comments (0)