Intro

Las primitivas para escribir código concurrente es uno de los puntos más fuertes de Go. Estas primitivas son las goroutines y los channels

Para no estar todo el rato escribiendo goroutine usaré como sinónimo rutina y para los channels usaré canal.

  • Goroutines

Las goroutines son hilos de ejecución ligeros que corren en el mismo espacio de direcciones. Son ligeros en el sentido de que el coste es reservar espacio en el stack y crecen reservando (y eliminando) memoria en el heap según sea necesario. Las goroutines se multiplexan en múltiples hilos del sistema operativo; si una rutina se bloquea, por E/S u otro motivo, otras goroutines siguen ejecutándose.

Al compartir espacio de memoria el acceso a variables compartidas debe sincronizarse.

  • Channels

Los channels se utilizan para compartir información entre las goroutines.

Por defecto enviar y recibir son bloqueantes hasta que el emisor y el receptor estén listos para la comunicación.

Esto se verá mejor con los ejemplos a lo largo del post.

  • Waitgroup

Un Waitgroup se utiliza para coordinar la ejecución de múltiples goroutines y esperar a que todas ellas finalicen antes de continuar con la ejecución del programa.

Goroutine

La sintaxis para lanzar una rutina es sencillo, basta con escribir delante de una función la palabra reservada go.

package main

import (
  "fmt"
)

func say(s string) {
    fmt.Println(s)
}

func main() {
    go say("world")
    say("hello")
}

El resultado del programa anterior es simplemente “hello” en lugar de “world hello”. Esto merece una breve explicación.

Cuando el programa se encuentra con: go say(“world”) ejecuta la función say en una rutina de forma independiente y sale inmediatamente. La siguiente línea es la función regular say(“hello”) que pertenece al hilo principal, al finalizar como el hilo principal ha terminado, el programa termina y finaliza.

Si queremos ver como se ejecuta la rutina agregamos un delay en el hilo principal para que le de tiempo a la rutina a ejecutarse. Esta no es la manera correcta de trabajar, para eso están los canales.

package main

import (
    "fmt"
    "time"
)

func say(s string) {
    fmt.Println(s)
}

func main() {
    go say("world")
    say("hello")
    time.Sleep(100 * time.Millisecond)
}

Channels

Un objeto channel sirve para enviar y recibir datos a través de el. Un objeto de tipo channel se crea con la función make.

El operador para utilizar los canales es: <-

ch <- v //Send v to channel ch
v := <-ch //Receive from ch and assign it to value v
//the data flows in the direction of the arrow

Veamos un pequeño código que devuelve los números impares de una lista de números. El cálculo lo hace una rutina y el resultado se devuelve a través de un canal

package main

import (
"fmt"
)

func is_odd(n int) bool {
    if n % 2 != 0 {
        return true
    }
    return false
}

func get_odd_numbers(ch chan []int, num_list []int) {
    odd_num_list := make([]int, 1) //Create a slice with at least 1 element
    for _, num := range num_list {
        if is_odd(num) {
            //the append func will resize the slice
            odd_num_list = append(odd_num_list, num)
        }
    }
    ch <- odd_num_list
}

func main() {
    num_list := make([]int, 10)
    //Create a slice with the first 10 numbers
    for i := range num_list {
        num_list[i] = i
    }

    ch := make(chan []int)
    //calculate the odd numbers in a goroutine
    go get_odd_numbers(ch, num_list)
    //get the result from goroutine
    odd_num_list := <- ch
    fmt.Printf("odd numbers = %v\n", odd_num_list)
}

Como se puede observar utilizar concurrencia es realmente sencillo.

Sincronización usando channels

Recuperando el ejemplo anterior veamos un fragmento de código para sincronizar una goroutine y el hilo principal

package main

import (
   "fmt"
)

func say(done chan bool, s string) {
    fmt.Println(s)
    done <- true // set done to true
}

func main() {
    done := make(chan bool)
    go say(done, "Hello world")
    <- done //Wait for goroutine to finish
}

Buffered and unbuffered channels

La diferencia entre un buffered y unbuffered channel radica en que se puede crear un canal con un tamaño determinado, de manera que se puede ir enviando encolando mensajes hasta llenarse, y cuando lo haga se envía todos los mensajes de una.

unbuffered_channel := make(chan int)

//Sends to a buffered channel. It blocks when the buffer is full and the
//receiver blocks when buffer is empty
buffered_channel := make(chan int, 100)

Closing channels

Cerrar un canal indica que el sender no va a enviar más valores al receiver. Solo el sender debe cerrar un canal, nunca el receiver.

package main

import (
  "fmt"
)

//sender
func linear_gen(c chan int, max int) {
    for i := 0; i < max; i++ {
        c <- i
    }
    close(c)
}

func receiver(c chan int) {
    //loops until channel c is closed
    for v := range c {
        fmt.Printf("%d\n", v)
    }
}

func receiver2(c chan int) {
    for {
        //when ok is false means no more values
        //will be received
        n, ok := <- c
        if !ok {
            break
        }
        fmt.Printf("%d\n", n)
    }
}

func main() {
    c := make(chan int, 20)

    go linear_gen(c, 100)
    receiver(c)
}

Channel directions

Cuando se usan los canales como argumentos de una función se puede especificar si un canal es solo de entrada o solo de salida. La dirección del canal se comprueba en tiempo de compilación, si se usa erróneamente se lanza un error de compilación.

package main

import "fmt"

func echo(input <-chan string, output chan<- string) {
    msg := <- input
    output <- msg
}

func main() {
    input := make(chan string)
    output := make(chan string)

    go echo(input, output)
    input <- "Hello world"

    msg := <- output
    fmt.Printf("%s\n", msg)
}

Select

La instrucción select permite esperar en diferentes canales. Esta sentencia bloquea la ejecución de una goroutine hasta que uno de los casos del select se puedan ejecutar. Si múltiples canales están listos selecciona uno aleatoriamente.

Además la sentencia select cuenta con un default case que se ejecuta cuando ningún canal esté listo.

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1, ch2 := make(chan string), make(chan string)

    go func() {
        ch1 <- "goroutine 1"
    }()
    go func() {
        ch2 <- "goroutine 2"
    }()


    //delay necessary to give time to goroutine to run
    time.Sleep(time.Second * 1)
    for i := 0; i < 5; i++ {
        select {
        case msg1 := <- ch1:
            fmt.Printf("%s\n", msg1)
        case msg2 := <- ch2:
            fmt.Printf("%s\n", msg2)
        default:
            fmt.Printf("default select case\n")
        }
    }
}

Timeouts

También es posible utilizar timeouts para evitar tener que estar esperando un mensaje a través de un canal. Se puede utilizar en conjunto con la sentencia select.

package main

import (
    "fmt"
    "time"
)

func long_running_task(result chan int) {
    time.Sleep(time.Hour)
    result <- 0
}

func main() {
    c := make(chan int)

    go long_running_task(c)

    select {
    case result := <- c:
        fmt.Printf("Long running task result=%d\n", result)
    case <-time.After(time.Second * 10):
        fmt.Printf("10 seconds elapsed and long running task didn't finish, exit\n")
    }
}

WaitGroup

Para esperar a que finalicen múltiples goroutines se utiliza el objeto WaitGroup.

package main

import (
    "fmt"
    "sync"
    "time"
)

func long_running_task(wg *sync.WaitGroup) {
    time.Sleep(time.Second  * 3)
    wg.Done()
}

func main() {
    var wg sync.WaitGroup
    const numTasks = 5
    wg.Add(numTasks)

    for i := 0; i < numTasks; i++ {
        go long_running_task(&wg)
    }
    wg.Wait()

    fmt.Printf("Finish")
}

Workers pool

Para terminar con la concurrencia vamos a implementar un patrón bastante común que es un worker pool. Una cola de workers que aceptan tareas para ejecutarse de forma concurrente.

La idea es usar channels para ir enviando tareas a los workers y otro canal para capturar el resultado. Cuando no haya más tareas, se cierra el canal con close.

Hay que tener en cuenta que el tamaño de los canales debe ser igual al número de tareas a ejecutar para evitar deadlocks, ya que de lo contrario un podría ocurrir que el hilo principal se bloquee porque no hay más espacio en el canal para enviar un job y la goroutine se bloquee porque no se está desencolando los resultados desde el hilo principal.

package main

import (
    "fmt"
    "time"
)

func worker(id int, job <-chan int, result chan<- int) {
    for range job {
        time.Sleep(time.Second * 1)
        result <- id
    }
}

func main() {
    const MAX_WORKERS = 5
    const MAX_TASKS = 20
    
    job := make(chan int, MAX_TASKS)
    result := make(chan int, MAX_TASKS)

    //Launch the workers
    for i := 0; i <= MAX_WORKERS; i++ {
        //Each worker will wait until a job is received
        go worker(i, job, result)
    }

    //Send job to workers
    for i := 0; i <= MAX_TASKS; i++ {
        //Just send something to unblock the worker
        //this could be the input for the worker
        job <- 0
    }

    //Wait all result
    for i := 0; i <= MAX_TASKS; i++ {
        id := <- result
        fmt.Printf("Worker[%d] finished\n", id)
    }
    //close the job channel to unblock the goroutine
    close(job)
}

Effective Go

Go by examples