Channels

Jose PauloJose Paulo
15 min read

Apesar de ser um assunto já bem conhecido e largamente documentado por vários posts e pela própria documentação da linguagem, resolvi abordar esse tema pra aprofundar meus conhecimentos e pra contribuir de alguma forma com outros desenvolvedores. Penso que a melhor forma de entender um assunto seja explicando para outras pessoas, e essa é uma forma de fazer isso.

O que são ?

Channels são uma estrutura de Go que podemos utilizar para comunicarmos valores entre uma goroutine e outra, seja entre a goroutine principal e uma outra que foi criada, ou entre goroutines independentes entre si.

A ideia de channels, bem como todo o sistema de concorrência de Go, foi inspirado em um padrão chamado CSP (Comunicating Sequential Process) criado por C. A. R. Hoare, que define um sistema como conjunto de processos independentes entre si e que se comunicam através de mensagens bem definidas, o que difere de sistemas baseados em mutex, que também podem ser implementados em go, ou em locks, onde os sistemas independentes, aqui chamados de goroutines, se comunicam através de variáveis compartilhadas na memória.

Nesse modelo de concorrência estamos dividindo o mesmo recurso entre agentes diferentes que desejam acessá-lo de forma independente e, por vezes, simultânea. Para resolver esse problema criamos um lock, uma especie de semáforo que visa organizar o acesso ao recurso, no caso, a memória compartilhada.

O padrão CSP, e por consequência os channels de go, propõe uma mudança nessa ótica invertendo o fluxo e focando no objetivo principal, que nesse caso não é necessariamente dividir um recurso compartilhado mas sim comunicar informações entre processos, dessa forma o canal de comunicação precisa ser implementado de forma a permitir a comunicação segura entre os processos diferentes, essa é a magia que os channels de Go possibilitam.

Como Utilizar ?

Existem varias formas de utilizar channels uma vez que são em essência uma forma de comunicação entre processos concorrentes, de forma que vamos dar uma olhada em alguns exemplos.

Pub Sub

Esse exemplo é o mais simples da lista, ilustra apenas a comunicação entre duas goroutines, a principal, responsável pela função main do programa, e a criada a partir dela, chamando a função sayHello precedida pela palavra-chave go, dessa forma a linha não fica bloqueada e a execução segue a diante. A próxima parada é a declaração e atribuição da variável helloMsg, porém o valor atribuído a ela é o que sair do channel ch, o que faz com que essa linha seja ”bloqueada” e aguarde algo sair do channel. voltando alguns passos atrás, na função sayHello colocamos no channel o valor "Pickle Riiiick" , o que por sua vez sai na linha X onde a declaração da variável aguarda por um valor.

func main() {
    ch := make(chan string)

    go sayHello(ch)

    helloMsg := <-ch
    fmt.Println(helloMsg)
}

func sayHello(ch chan string) {
    ch <- "Pickle Riiiick"
}

Range

O exemplo anterior, apesar de ser bem ilustrativo, explora pouco do potencial dos channels, vamos verificar um pouco mais do que podemos fazer com eles.

func main() {
    ch := make(chan string)

    go doWork(ch)

    produceWork(ch)
}

func doWork(ch chan string) {
    for s := range ch {
        fmt.Println("recebemos pelo channel", s)
    }
}

func produceWork(ch chan string) {
    for i := 0; i < 5; i++ {
        fmt.Println("produzindo trabalho", i)
        ch <- fmt.Sprintf("numero %d", i)
    }
    close(ch)
}

Neste exemplo temos duas funções doWork e produceWork, uma produzindo no channel e outra lendo dele e processando algo. Podemos ver, talvez mais claramente, aqui que o channel fez de fato uma ponte segura entre as duas goroutines. Na função main iniciamos o channel ch e chamamos a função doWork em uma goroutine separada, que então aguarda algo no channel. Quando chamamos a função produceWork ela começa a inserir no channel que por sua vez começa a entregar na função doWork que está rodando em outra goroutine.

Este código pode parecer ok de um primeiro ponto de vista, porém se analisarmos seu output podemos ver que existe um problema.

produzindo trabalho 0
produzindo trabalho 1
recebemos pelo channel numero 0
recebemos pelo channel numero 1
produzindo trabalho 2
produzindo trabalho 3
recebemos pelo channel numero 2
recebemos pelo channel numero 3
produzindo trabalho 4

O “trabalho 4” foi produzido mas nunca consumido. O que podemos aprender com esse bug é que ao pensarmos em código concorrente precisamos ter atenção redobrada aos detalhes, pois a concorrência não é uma característica tão explicita e pode ser traiçoeira a um olhar mais desatento, devemos sempre ter em mente que o programa termina mesmo que alguma goroutine ainda não tenha terminado seu trabalho, por isso devemos projetar nosso programa de forma a evitar que isso aconteça. Uma correção simples seria inverter as goroutines colocando o consumer na goroutine principal, desta forma o programa não terminaria antes de consumir todos os trabalhos processados.

func main() {
    ch := make(chan string)

    go produceWork(ch)

    doWork(ch)
}

func doWork(ch chan string) {
    for s := range ch {
        fmt.Println("recebemos pelo channel", s)
    }
}

func produceWork(ch chan string) {
    for i := 0; i < 5; i++ {
        fmt.Println("produzindo trabalho", i)
        ch <- fmt.Sprintf("numero %d", i)
    }
    close(ch)
}

Desta forma o retorno fica assim.

produzindo trabalho 0
produzindo trabalho 1
recebemos pelo channel numero 0
recebemos pelo channel numero 1
produzindo trabalho 2
produzindo trabalho 3
recebemos pelo channel numero 2
recebemos pelo channel numero 3
produzindo trabalho 4
recebemos pelo channel numero 4

Outra forma de fazer com que o ultimo valor seja processado é aguardando um pouco, por exemplo com um time.Sleep ou usando um wait group, porém como estamos tentando entender os channels vamos focar neles por hora.

Buffer

Nos exemplos anteriores usamos apenas channels sem definir um buffer, a principal diferença é que o buffer faz o channel funcionar como um estacionamento com número de vagas limitado, sempre que lota, se alguém quiser entrar precisa esperar até que o outro saia, é ai que ocorre o bloqueio, a goroutine fica aguardando liberar uma vaga, vamos ver isso no exemplo.

func main() {
    ch := make(chan string, 2)

    go func() {
        ch <- "mensagem 1"
        ch <- "mensagem 2"
        ch <- "mensagem 3" // bloqueia até algo ser lido
    }()

    for i := 0; i < 3; i++ {
        msg := <-ch
        fmt.Println("Recebido:", msg)
    }
}

Aqui o texto “mensagem 3” só entrará no channel quando uma vaga for liberada, no caso quando “mensagem 1” for lido. Funcionando de forma diferente do channel sem buffer mostrado acima, onde uma msg só entra para o channel se tiver alguém consumindo do outro lado.

O channel com buffer é util, entre outras coisas, quando queremos processar uma determinada quantidade de mensagens de cada vez.

Como Funciona Por Baixo do Capô ?

Antes de tirarmos o véu, um disclaimer, essa explicação é bem simplificada, mas nos ajuda a entender um pouco da magia por trás dos channels

Como sabemos channels são uma referencia para uma outra estrutura, essa estrutura se chana hchan e tem mais ou menos esta estrutura.

type hchan struct {
    qcount   uint           // número de elementos no canal
    dataqsiz uint           // tamanho do buffer (0 para unbuffered)
    buf      unsafe.Pointer // ponteiro para o array circular
    sendx    uint           // índice de escrita
    recvx    uint           // índice de leitura
    recvq    waitq          // fila de goroutines esperando para receber
    sendq    waitq          // fila de goroutines esperando para enviar
    lock     mutex          // proteção contra acesso concorrente
}

Basicamente temos uma fila de goroutines esperando pra enviar, uma de goroutines esperando pra receber, o buffer, uma mutex para proteger contra acesso concorrente e inteiros para controlar indices e o número de elementos no canal.

Inserindo no Channel

Quando inserimos algo no channel estamos querendo enviar algo pelo channel , o processo é mais ou menos esse.

A primeira coisa que é feita ao receber um valor é checar a fila de goroutines esperando para receber algo, essa fila é o campo recvq da struct hchan, caso tenha alguma goroutine esperando para receber, o valor vai pra ela.

Caso contrario, caso a fila recvq esteja vazia, ou seja, não tem ninguém pra receber, o runtime vai checar se o buffer esta cheio, olhando para o campo dataqsiz. Aqui os channels sem buffer apenas tem um buffer de tamanho 0, sendo assim o buffer sempre estará cheio nesse caso. Se o buffer ainda tiver espaço o runtime vai armazenar o valor no buffer no espaço disponível e vai incrementar o índice de escrita sendx e incrementar o qcount.

Caso não tiver ninguém pra receber o valor e o buffer estiver cheio a goroutine que enviou o valor é colocada em “pausa” e enviada para a fila de goroutines em espera para enviar, ou seja sendq.

Lendo do Channel

Para ler do channel o processo é quase o inverso, a primeira coisa que precisamos para poder ler é ter um valor para ler, e o primeiro lugar que o runtime busca esse valor para ler é o buffer, caso tenha um valor no buffer ele é retornado, o recvx é incrementado e o qcount é decrementado.

Caso não tenha valor no buffer o runtime olha pra sendq, que é a fila de goroutines esperando para enviar uma mensagem, se tiver alguma goroutine esperando o runtime acorda ela, recebe a mensagem que ela está querendo enviar e envia para a goroutine que está lendo do canal.

Caso a sendq esteja vazia o runtime pausa a goroutine que está querendo ler e a insere na fila de goroutines querendo receber, a recvq.

Implementando Um Channel Na Unha

Explicar algo para alguém, sem dúvidas, sempre é uma boa forma de aprender, porém por a mão na massa também é um esforço extremamente valoroso, então vamos tentar implementar nosso próprio channel manualmente.

Obviamente nossa implementação é apenas um estudo, sendo bem mais simples e de longe bem menos performática que a implementação da linguagem em si, mas serve ao nosso propósito, então vamos lá.

A primeira coisa a se fazer é criar uma struct que será o nosso channel

import "sync"

type PirateChan struct {
    buf      []string   // nosso buffer, por hora nosso channel só recebe strings
    cap      int        // a capacidade do nosso channel
    mu       sync.Mutex // ──╮
    notEmpty *sync.Cond // ────❯  vamos entender esses tres campos mais à frente
    notFull  *sync.Cond // ──╯
    closed   bool       // se o canal ainda está aberto ou não
}

Como não fazemos parte da standart library não vamos poder iniciar o nosso channel com o make, então vamos criar um construtor pra ele.


func New(capacity int) *PirateChan {
    pch := &PirateChan{
        buf: make([]string, 0, capacity),
        cap: capacity,
    }

    pch.notEmpty = sync.NewCond(&pch.mu)
    pch.notFull = sync.NewCond(&pch.mu)

    return pch
}

No nosso construtor, primeiramente inicializamos nosso buffer com zero elementos e um capacity recebido por parâmetro

Posteriormente inicializamos os nossos Cond, que é uma estrutura de dados não tão comum de ser vista em go, ao menos na minha experiência, então acho que podemos abrir um parêntese para falar um pouco sobre como eles funcionam e como estamos usando aqui.

Cond 🧛🏾‍♂️

Quando queremos que uma goroutine espere, ou seja, fique congelada até que uma condição seja verdadeira, podemos, entre outras soluções, usar a forma mais básica que talvez seja usar um for com um time.Sleep, porém isso não é lá muito performático pois estamos gastando CPU enquanto nada acontece, não seria mais elegante se pudéssemos de fato bloquear a goroutine e acorda-la quando o que estivermos esperando que mude tenha de fato mudado?!

Nesse ponto entra o Cond que é uma estrutura de dados que fornece sincronicidade baseada em uma condição. Por exemplo

var mu sync.Mutex
var cond = sync.NewCond(&mu)
var septemberEnds = false

func billieJoe() {
    mu.Lock()
    for !septemberEnds {
        cond.Wait() // espera até setemberEnds
    }
    fmt.Println("Yes setembro acabou...")
    mu.Unlock()
}

func main() {
    go billieJoe()

    time.Sleep(1 * time.Second)

    mu.Lock()
    septemberEnds = true
    cond.Signal() // avisa o billie joe que setembro acabou
    mu.Unlock()
}

No nosso exemplo disparamos um worker, nesse caso o billieJoe, porém setembro ainda não chegou ao fim, então ele dorme até que alteremos o valor da variável septemberEnds para true e depois enviamos o cond.Signal() para avisa-lo que setembro acabou.

O cond.Wait() fica dentro de um loop com a checagem se setembro chegou ao fim, pois como podemos perceber no main, é possível enviarmos um cond.Signal() mesmo que a condição septemberEnds ainda seja false, o que não é o caso aqui, por isso o for faz com que, mesmo que o wait desperte a goroutine, a checagem da condição aconteça novamente antes de seguir o fluxo a frente.

Como também podemos ver no exemplo, o Cond controla apenas o fluxo de espera e da sinalização entre as goroutines, por isso ele sempre anda acompanhado de um Locker que é uma interface assinada pela Mutex, assim o Cond sempre precisa de alguém que proteja o acesso aos dados, o que geralmente, incluindo o nosso channel pirata, é feito por uma Mutex.

Voltando ao Nosso Channel

Agora temos a struct e o construtor para nosso channel pirata, precisamos criar alguma forma de inserirmos, lermos e fecharmos nosso channel.

Inserindo

Para inserirmos vamos criar um método na struct PirateChan chamado Send

func (pch *PirateChan) Send(v string) {
    pch.mu.Lock()
    defer pch.mu.Unlock()

    for len(pch.buf) == pch.cap && !pch.closed {
        pch.notFull.Wait()
    }

    if pch.closed {
        panic("send on closed pirate channel")
    }

    pch.buf = append(pch.buf, v)
    pch.notEmpty.Signal()
}

A primeira coisa que fazemos nesse método é garantir o acesso exclusivo a estrutura do channel, por isso fechamos a mutex e damos um defer para abri-la ao final.

Após isso iniciamos um for onde checamos se o buffer está cheio e se o canal ainda não está fechado, e caso essas condições sejam verdadeiras, significa que não podemos mais adicionar algo no buffer, logo a goroutine que está tentando inserir no channel precisa ficar em espera, por isso chamamos o Wait do cond notFull que libera temporariamente a mutex, liberando o acesso a estrutura do channel e deixando a goroutine em wait, semanticamente poderíamos ler esse for da seguinte maneira

Aguarde pacientemente até que o buffer não esteja cheio

E o for, assim como no exemplo do Billie Joe, faz com que, caso o wait desperte e o buffer ainda esteja cheio, a condição seja reavaliada entrando novamente em wait

Após o for temos um if checando se o canal esta fechado, caso esteja ele retorna um panic dizendo que estamos tentando inserir em um canal fechado.

Se chegarmos até esse ponto do código o que faremos é inserir no buffer com append e acordamos alguma goroutine que esteja tentando receber uma mensagem chamando Signal do cond notEmpty, semanticamente estamos dizendo.

Ei, tem algo novo aqui no buffer, venha buscar

Recebendo

Para recebermos os dados vamos criar na nossa struct do channel pirata um método que chamamos de Receive, chamando esse método poderemos ler do buffer ou receber de uma outra goroutine que está tentando enviar.

func (pch *PirateChan) Receive() (string, bool) {
    pch.mu.Lock()
    defer pch.mu.Unlock()

    for len(pch.buf) == 0 && !pch.closed {
        pch.notEmpty.Wait()
    }

    if len(pch.buf) == 0 && pch.closed {
        return "", false
    }

    v := pch.buf[0]
    pch.buf = pch.buf[1:]
    pch.notFull.Signal()

    return v, true
}

Assim como no método para enviar, aqui também bloqueamos o acesso ao channel por meio de uma mutex.

Também fazemos a checagem se o channel está com o buffer vazio e ainda está aberto, caso sim colocamos essa goroutine em espera pois não tem nada pra receber, semanticamente podemos ler a linha do wait como

Espere até não estar vazio

Posteriormente vamos checar se o canal esta vazio e fechado, nesse caso é o final da linha e não vamos nunca receber nada desse channel, afinal ele acabou.

E se chegarmos a esse ponto, significa que temos realmente um valor para ler do channel, e esse processo é o seguinte, lemos do índice 0 do buffer, depois removemos esse item do buffer reatribuindo ele como sendo ele mesmo do indice 1 pra frente.

Posteriormente enviamos um Signal no cond notFull, ou seja semanticamente

Ei, não estamos cheios, pode tentar enviar novamente

Fechando o Channel

Agora precisamos de uma forma de fechar o nosso channel e liberar o que for preciso ser liberado, então vamos implementar o método Close na struct do nosso channel

func (pch *PirateChan) Close() {
    pch.mu.Lock()
    defer pch.mu.Unlock()

    pch.closed = true

    pch.notEmpty.Broadcast()
    pch.notFull.Broadcast()
}

Assim como todos os outros método usamos a mutex mu para bloquear o acesso a estrutura do channel.

Depois setamos o closed pra true, e logo depois fazemos o Broadcast de notEmpty e de notFull para que caso alguma goroutine ainda esteja bloqueada esperando sejam acordadas e percebam, graças ao tratamento nos métodos Send e Receive, que o closed agora é true então precisam dar o tratamento adequado para este caso.

Usamos Broadcast aqui, pois o objetivo é não deixar nenhuma goroutine para trás, acordando todas.

Utilizado o Channel Pirata 🏴‍☠️

Para vermos a utilização do nosso channel na prática podemos reescrever o nosso exemplo do range mas agora com o nosso channel

func main() {
    pirateCh := pirateChan.New(2)

    go produceWork(pirateCh)

    doWork(pirateCh)
}

func doWork(ch *pirateChan.PirateChan) {
    for {
        s, ok := ch.Receive()
        if !ok {
            break
        }
        fmt.Println("recebemos pelo channel", s)
    }
}

func produceWork(ch *pirateChan.PirateChan) {
    for i := 0; i < 5; i++ {
        fmt.Println("produzindo trabalho", i)
        ch.Send(fmt.Sprintf("numero %d", i))
    }
    ch.Close()
}

O output, caso tenha curiosidade ficou assim

produzindo trabalho 0
produzindo trabalho 1
produzindo trabalho 2
recebemos pelo channel numero 0
produzindo trabalho 3
produzindo trabalho 4
recebemos pelo channel numero 1
recebemos pelo channel numero 2
recebemos pelo channel numero 3
recebemos pelo channel numero 4

Conclusão

Channels são uma ferramenta poderosa e, se estudarmos eles mais a fundo, podemos ver o quanto a implementação de go do CSP e como a própria linguagem em si são poderosos e simples, logicamente esse artigo é um olhar muito superficial sobre o tema, porém espero mostrar que por mais complexo que um tema possa parecer sempre podemos nos debruçar sobre ele um pouco e entender mais e mais.

Em um certo momento confesso que me passou pela cabeça que um channel também é uma estrutura de dados compartilhada entre goroutines diferentes, mas implementando pude ver que apesar de sim, ser uma estrutura de dados dividida entre goroutines, o principio de funcionamento muda, o que muda a forma como pensamos a utilização dessa ferramenta, saio desse artigo com fome de entender mais sobre a linguagem e sobre computação em si, e espero que de qualquer forma que seja esse texto, ou alguma parte dele que seja, possa ajudar alguém.

Agora que desvendamos a estrutura dos channels por dentro, podemos não apenas usá-los com mais confiança, mas também entender as decisões que fazem Go ser a linguagem simples e poderosa que é.

0
Subscribe to my newsletter

Read articles from Jose Paulo directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Jose Paulo
Jose Paulo