|1. | Modelo de concurrencia
|2. | Goroutines y canales
|3. | Wait Groups
|4. | Select y Worker pools
|5. | ErrGroup (error groups)
Para qué sirve select en Go?
Bueno llegamos a la cuarta entrega, en este caso vamos a esar hablando de la palabra reservada select
y una pequeña implementación en un worker pool (vamos a explicar que es un worker pool).
Select es un bloque, tiene su sintaxis y su scope comprende todo lo que se ocupa dentro de sus llaves. Su función es el de poner en espera (o wait) a múltiples comunicacions a la goroutine que se está ejecutando, se va a liberar cuando se cumpla alguna de las condiciones que planteamos dentro de nuestro bloque. En otras palabras, en parte de nuestro código que podemos recibir mas de una rutina, es dónde vamos a usar el select.
La sintaxis básica es:
//...
select {
//...
}
Nota importate: El select bloquea la rutina que lo contiene, muchas veces vamos a ver que nuevas goroutines lanzan un select
, tengan esto muy en cuenta y hagan las pruebas necesarias para saber que no van a bloquear la rutina principal.
Nota importante II: No usen select vacios.
Ahora para agregarle algo interesante a nuestro select
, dentro del bloque vamos a estar esperando por mensajes que lleguen a los canales que invoquemos. Funciona parecido a la cláusula switch
, pero la condición es quien reciba el mensaje, no hace ningún tipo de pattern matching.
Vamos a una impl real
c1 := make(chan string)
c2 := make(chan string)
// enviamos el primer mensaje.
go func() {
time.Sleep(1 * time.Second)
c1 <- "one"
}()
// enviamos el primer mensaje.
go func() {
time.Sleep(2 * time.Second)
c2 <- "two"
}()
// con este select esperamos dos veces, uno por cada
// canal. Otra práctica común es tener un for infinito
// en una rutina separada para asi no bloquear a la
// principal.
for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
}
Aca vemos como en cada case, esperamos leer un mensaje de cada canal. Una vez que ocurra, vamos a salir del bloque.
Ahora, otro ejemplo que nos sirve como un daemon en nuestro servicio, que corra cada X tiempo y eternamente mientras la app este en ejecución.
import "time"
//
//
go func() {
ticker := time.NewTicker(1 * time.Hour)
for {
select {
case <-ticker.C:
//doSomething()
}
}
}()
Este código nos asegura que cada una hora vamos a invocar a lo que nosotros le indiquemos. El ticker es un countdown que nos provee el paquete time, muy útil para estos casos.
Resumen del select
- Bloquea a la rutina que lo esta ejecutando.
- Cuando entra en un case, va a anular a los otros hasta que vuelva a ejecutarse el select (por eso es tan comun verlo en un for{} infinito).
- Suelen ir dentro de una función anónima lanzada en una goroutine.
Worker pools, que son?
sabemos que un pool es una reserva, en este caso de workers. Ahora en nuestro código tenemos que identificar 3 cosas claras. 1) El pool 2) El worker y 3) El trabajo a realizar.
El objetivo va a ser que podamos procesar trabajo de forma concurrente, los worker que terminan su trabajo, van de nuevo al pool y así tratar de ahorrar recursos, además de tratar de trabajar más eficientemente.
Vamos a un poco de go.
type Unit interface {
Job() error
}
Definimos lo que va a ser nuestra unidad de trabajo. Podríamos crear otras interfaces más y hacerlo mas polimórfico. No nos preocupemos por quien lo implementa, sera trabajo del cliente.
var jobQueue chan Unit // cola de trabajo compartida
type worker struct {
pool chan chan Unit
jobCh chan Unit
}
func newWorker(pool chan chan Unit) *worker {
return &worker{
jobCh: make(chan Unit),
pool: pool,
}
}
Acá ya tenemos un worker que va a pertener a un pool y escuchar a un canal de unidades de trabajo. El doble channel para el pool es para el registro y otro para activarlo.
La otra función, simplemete actua de constructor.
type pool struct {
pool chan chan Unit
workers int
}
func NewPool(maxWorkers int) *pool {
return &pool{
pool: make(chan chan Unit, maxWorkers),
workers: maxWorkers,
}
}
Ahora tenemos la struct de pool, que contiene el mismo canal que el worker y cuantos workers queremos crear, acompañado de su constructor.
Tenemos el esqueleto, ahora pasamos a la acción con los métodos.
func (d *pool) Run() {
for i := 0; i < d.workers; i++ {
w := newWorker(d.pool)
w.start()
}
go d.dispatch()
}
Ya se ve algo interesante, por cada worker en que indicamos, vamos a crear uno, asignale el pool que ya teníamos y arrancarlo, veamos de que se trata.
func (w *worker) start() {
go func() {
for {
// register the actual worker in the queue.
w.pool <- w.jobCh
select {
case job := <-w.jobCh:
// do the actual job here
err := job.Job()
if err != nil {
log.Println(err.Error())
}
}
}
}()
}
Para el método start, vamos a registrar el worker y luego con la cláusula select, vamos a esperar a que llegue un trabajo y realizarlo (sin saber que es, pero no nos importa). Todo esto en una nueva rutina por worker.
func (d *pool) dispatch() {
go func() {
for {
select {
case job, ok := <-jobQueue:
if ok {
jobChannel := <-d.pool
jobChannel <- job
}
}
}
}()
}
Por último, el dispatcher
que va a distribuir la carga de trabajo, cada vez que de la jobQueue, se toma uno disponible del pool y a ese worker (implícito en el job channel) se le asigna el trabajo. La job queue es un recurso compartido.
Código en el cliente
func main() {
// init the shared data structure.
splanner.InitQueue(20)
// init the dispatcher & keep it listening.
splanner.NewPool(15).Run()
s := http.Server{}
s.Addr = ":8080"
http.HandleFunc("/jobs", JobHandler)
log.Fatal(s.ListenAndServe())
}
func JobHandler(w http.ResponseWriter, r *http.Request) {
log.Println("getting job...")
q := r.URL.Query().Get("q")
if q == "" {
q = "default"
}
for a := 0; a < 100; a++ {
work := HeavyWork{Name: q, number: a}
splanner.AddUnit(&work)
_, _ = w.Write([]byte(fmt.Sprintf("job %s %d done", work.Name, a)))
}
}
type HeavyWork struct {
Name string `json:"name"`
number int
}
func (p *HeavyWork) Job() error {
time.Sleep(500 * time.Millisecond)
fmt.Println(fmt.Sprintf("heavy job is running %d", p.number))
return nil
}
Así de simple queda par ael cliente, tiene que implementar la interfaz Unit, con la funcion Job() y agregar la unidad de trabajo con una funcion exportada AddUnit que encola el trabajo. En main
vemos el setup es extremadamente trivial.
El codigo completo, disponible en github
Conclusiones
Vemos el poder y flexibilidad que nos da el select para orquestar los workers y combinado con el pool, poder despachar rutinas, obtener métricas, etc etc.
No nos olvidemos que hay que estudiarlo, usarlo debidamente y sobre todo, probarlo en muchos escenarios para no llevarnos sosrpresas no gratas en producción.
Ya saben, si quieren sponsorearme, pueden hacerlo acá!
Top comments (5)
¡Excelente artículo! Me gustaría felicitar al autor por abordar este tema de manera tan completa y detallada. Las secciones presentadas, desde el modelo de concurrencia hasta los grupos de error, proporcionan una visión muy completa de cómo trabajar con Goroutines y canales en Go.
Muchas gracias! dentro de poco la ultima entrega
Muy buen artículo, bien detallado para algo tan útil como el select, para poder manejar de forma más ordenada el flujo de nuestras goroutinas con los channels.
Muchas gracias gabi!
Gracias Tomi, muy bueno!