Red-Black Trees augmentada com Order Statistics: Uma Abordagem Prática

Existem certos tipos de problemas em programação que envolvem containers nos quais os que são disponibilizados como padrão pela maioria das linguagens tornam a solução mais complexa. Um exemplo clássico é a necessidade de inserir um valor em um container de maneira ordenada por uma chave e, ao mesmo tempo, recuperar esse valor por índice.

Um caso básico e abstrato seria inserir inteiros em um container de forma ordenada e recuperar o valor na posição Θ(log n). Usando como referência o C++ e os containers da STL, não há uma estrutura que forneça essas duas funcionalidades de maneira eficiente.

Se utilizarmos std::set para manter os itens ordenados, a inserção ocorre em Θ(log n) , mas a busca pelo item na posição \( x \) exige uma iteração do início até \( x \), resultando em uma complexidade de Θ(n).

Uma abordagem alternativa seria usar std::vector, onde a recuperação de um item na posição \( x \) ocorre em Θ(1). No entanto, a inserção ordenada exigiria a busca da posição correta via busca binária Θ(log n), seguida da realocação dos elementos subsequentes, o que tem um custo de Θ(n-x), tornando a inserção menos eficiente.

No livro Introduction to Algorithms (Cormen et al.), é apresentado o conceito de Augmenting Data Structures, uma técnica que consiste em escolher uma estrutura de dados base já conhecida, como arrays, listas ou árvores binárias, e adicionar propriedades extras a essas estruturas para desenvolver novas funcionalidades.

No mesmo capítulo, para ilustrar essa técnica, os autores utilizam uma Red-Black Tree para implementar a funcionalidade de Order Statistics, que permite recuperar um item pelo seu rank ou determinar, a partir de uma chave, qual é o seu rank dentro da estrutura.

Figura retirada de "Introduction to Algorithms", Cormen et al., 3ª edição, Capítulo 14.

Meu objetivo com este post não é demonstrar como implementar Order Statistics em uma Red-Black Tree, pois os próprios autores já fazem isso de forma detalhada. Em vez disso, quero focar no uso prático dessa estrutura. No entanto, antes disso, faremos uma breve passagem pelas funcionalidades adicionadas.

De forma simplificada as duas novas funcionalidades são:

Key select(size_t _rank);
size_t rank(Key _key);

A primeira função, select, recebe um rank como parâmetro e retorna a chave correspondente. Já a segunda, rank, faz o oposto: recebe uma chave e retorna o seu rank na estrutura.

Para que isso seja possível, é necessário adicionar ao nó da árvore uma propriedade extra que representa o tamanho da subárvore enraizada em um nó \( x \). Esse tamanho é calculado da seguinte forma:

$$size = x.left.size + x.right.size + 1$$

Esse valor deve ser atualizado a cada inserção e remoção na árvore sem afetar a complexidade do algoritmo base.

Com essa informação extra, podemos implementar as duas novas funções, permitindo calcular o rank de uma chave em Θ(log n), assim como encontrar a chave correspondente a um determinado rank também em Θ(log n).

Abaixo, segue o algoritmo de implementação dessas funções:

iterator select(size_t _rank)
{
    if (m_root == m_nil || _rank > m_size)
    {
        return end();
    }
    return select(_rank, m_root);
}

iterator select(size_t _rank, node_ptr _node)
{
    size_t r = _node->m_left->m_size + 1;
    if (r == _rank)
    {
        return iterator(_node);
    }
    if (_rank < r)
    {
        return select(_rank, _node->m_left);
    }
    else
    {
        return select(_rank - r, _node->m_right);
    }
}

A primeira função, select, verifica se o rank solicitado está dentro do intervalo válido do container, ou seja, o conjunto de ranks válidos varia de 1 até o tamanho do container.

A segunda função inicia a busca pelo rank solicitado a partir do nó principal. A primeira linha:

size_t r = node->m_left->m_size + 1;

calcula o rank do nó atual, pois todos os nós à esquerda dele possuem valores menores.

  • Se o rank do nó atual for igual ao rank solicitado, encontramos o nó.

  • Se o rank solicitado for menor, devemos seguir para a esquerda, chamando a função select recursivamente com o nó à esquerda.

  • Se o rank solicitado for maior, seguimos para a direita. No entanto, ao mover para a direita, precisamos subtrair do rank procurado o rank do nó atual, pois já descartamos todos os elementos que estavam antes dele.

std::pair<size_t, iterator> rank(const Key _key) {
    auto node = m_root;
    size_t rank = 0;
    while (node != m_nil && node->key != _key)
    {
        if (_key < node->key)
        {
            node = node->m_left;
        }
        else
        {
            rank += node->m_left->m_size + 1;
            node = node->m_right;
        }
    }
    if (node != m_nil)
    {
        rank += node->m_left->m_size + 1;
    }
    else
    {
            rank = 0;
    }
    return { rank, iterator(node) };
}

A função rank também é bastante simples, sendo essencialmente uma busca binária tradicional. O rank começa em 0 e é incrementado caso a chave buscada seja maior que a do nó atual. Se o nó for encontrado, também somamos o rank do nó atual à variável de retorno.

Com essas duas funções extras, podemos implementar um custom map parecido com a STL, já que o std::map da STL também é implementado utilizando uma Red-Black Tree, com a seguinte interface:


class tree_iterator;

template<typename Key, typename T, typename Compare = std::less<Key> /*...*/>
class os_map {
public:
   using value_type = std::pair<Key, T>;
     using iterator = tree_iterator;
     using key_compare = Compare;
     using rank_type = size_t;

   iterator begin();
   iterator end();
   std::pair<rank_type, iterator> insert(value_type&& _key);
   std::pair<rank_type, iterator> erase(const Key& _key);
   std::pair<rank_type, iterator> rank(const Key& _key);
   iterator select(rank_type _rank);
   //...//
};

Exemplo prático de utilização.

Nosso caso de uso será a implementação de um livro de ofertas que utiliza o preço como chave e armazena o volume correspondente. A atualização do livro de ofertas ocorre por meio de dois tipos de eventos: insert e remove, que funcionam da seguinte forma:

  • Se o evento for um insert e o preço ainda não estiver presente no livro de ofertas, inserimos esse novo valor. Caso o preço já exista, simplesmente incrementamos o volume atual desse preço com o valor recebido no evento.

  • No caso de um remove, se o volume associado ao preço for igual ao volume armazenado no container, removemos o item. Se for menor, apenas decrementamos o volume correspondente.

Até aqui, um std::map resolveria nosso problema. No entanto, esse livro de ofertas pode ser assinado por clientes, e na assinatura, o cliente pode definir a profundidade que deseja receber. Ou seja, se um cliente assinar o livro com uma profundidade de 10, ele receberá eventos apenas para atualizações que ocorram entre as posições 1 e 10. Caso a atualização ocorra em uma posição superior a 10, o cliente não receberá o evento.

A geração de eventos para os clientes deve seguir as seguintes regras:

  • Se ocorrer somente uma atualização (seja insert ou delete) do volume, enviamos ao cliente um evento com o volume atualizado para aquele preço.

  • Se um novo preço for inserido dentro do intervalo assinado, devemos enviar um evento de insert para esse novo elemento. Caso, após a inserção, o book ultrapasse a profundidade assinada, também devemos enviar um evento de delete para a posição 11 (que corresponde à posição 10 no book do cliente), garantindo que o limite de profundidade seja respeitado.

  • Se ocorrer uma deleção dentro do intervalo assinado e, após a remoção, o tamanho do container ainda for maior ou igual à profundidade assinada, devemos enviar um evento de insert para a nova posição 10.

No caso a implementação desse livros de ofertas ficaria da seguinte maneira:

#pragma once 
#include "os_map.hpp"
#include <vector>
#include <utility>
#include <set>
#include <stdexcept>

template<typename Key, typename T, typename Compare = std::less<Key>>
class aggregate_book {
public:

    using map_type = os_map<Key, T, Compare>;
    using value_type = typename map_type::value_type; 
    using depth_container = std::set<size_t>;

    struct Event {
        using iterator_depth = depth_container::iterator;
        Event(depth_container::iterator _begin, depth_container::iterator _end, 
                    const value_type&& _data) : begin(_begin), end(_end), data(_data) {}
        iterator_depth begin;
        iterator_depth end;
        value_type data;
    };
    void susbscribe_depth(size_t _depth) {m_depths.insert(_depth)};
    void unsubscribe_depth(size_t _depth) {m_depth.erase(_depth)};
    std::vector<Event> insert_or_update(value_type&& _value);
    std::vector<Event> delete_or_update(value_type&& _value);
private:
    depth_container::iterator get_depth_iterator(size_t _depth);
    map_type m_map;
    depth_container m_depths;
};

Na classe aggregate_book, temos duas propriedades principais:

  • depth_container, que armazenará as profundidades assinadas pelos clientes.

  • map_type, que, nesse caso, será o nosso os_map, onde iremos armazenar os preços e seus respectivos volumes.

A struct event representa os eventos que devem ser repassados aos clientes após uma inserção ou remoção. Como alguns eventos são comuns a mais de uma profundidade, o evento possui duas propriedades: begin e end, que indicam o intervalo de profundidade ao qual o evento pertence. Além disso, há o value_type, que representa o evento em si.

A seguir, veremos em detalhes as implementações das funções get_depth_iterator, insert_or_update e delete_or_update.

std::set<size_t>::iterator get_depth_iterator(size_t _depth)
{
        auto itDepth = m_depths.upper_bound(_depth);
        if (itDepth != m_depths.begin())
        {
            auto tmp = itDepth;
            --tmp;
            if (*tmp == _depth)
            {
                itDepth = tmp;
            }
        }
        return itDepth;
}

A função get_depth_iterator tem como objetivo retornar um iterador que aponta para a primeira profundidade assinada que seja igual ou maior que um determinado rank. Como estamos buscando as profundidades assinadas dentro de um certo rank, seguimos as regras de assinatura:

  • As profundidades maiores que o rank modificado receberão atualizações.

  • As profundidades menores que o rank informado não receberão nenhuma atualização.

Caso o retorno da função seja igual a m_depths.end(), nenhum evento será gerado.

std::vector<Event> insert_or_update(std::pair<Key, Value> _value)
    {
        std::vector<Event> ret;
        auto entry = m_map.rank(_value.first);
        if (entry.second == m_map.end())
        {
            entry = m_map.insert(std::move(_value));
            auto itDepth = get_depth_iterator(entry.first);
            if (itDepth != m_depths.end())
            {

                for (auto it = itDepth; it != m_depths.end(); ++it)
                {
                    auto delEntry = m_map.select((*it) + 1);

                    if (delEntry != m_map.end())
                    {
                        auto last = it;
                        ++last;
                        ret.push_back({ it, last, { delEntry->first, delEntry->second - delEntry->second } });
                    }
                }

                ret.push_back({itDepth, m_depths.end(), {entry.second->first, entry.second->second}});
            }
        }
        else
        {
            entry.second->second += _value.second;
            auto itDepth = get_depth_iterator(entry.first);
            if (itDepth != m_depths.end())
            {
                ret.push_back({itDepth, m_depths.end(), {entry.second->first, entry.second->second}});
            }
        }
        return ret;
    }

A função insert_or_update começa verificando se já existe um valor com o mesmo preço no container. Caso o valor não seja encontrado, ele entra na condição do primeiro if e insere o novo item no mapa. O método insert retorna o rank e o iterator do item recém-adicionado.

Com o rank, buscamos as profundidades que são maiores ou iguais ao rank inserido. Se essas profundidades existirem, devemos gerar eventos de delete para remover a última posição de cada profundidade. Esse processo ocorre dentro do segundo if:

  • Para cada profundidade maior ou igual ao rank inserido, verificamos se existe um item na posição profundidade + 1 no mapa (pois um novo item foi inserido anteriormente).

  • Caso esse item exista, geramos um evento de delete, enviando ao cliente um evento com o preço e o volume igual a 0, indicando a remoção do item.

  • Após gerar todos os eventos de delete, criamos um evento de insert para todas as profundidades que estão acima do rank inserido.

Se o item já existir no container, o volume é acumulado e um evento com o volume atualizado é gerado para todas as profundidades iguais ou maiores ao rank atualizado.

std::vector<Event> delete_or_update(std::pair<Key, Value> _value)
    {
        std::vector<Event> ret;
        auto entry = m_map.rank(_value.first);
        if (entry.second == m_map.end())
        {
            throw std::runtime_error("try delet a inexistent item");
        }
        entry.second->second -= _value.second;
        auto itDepth = get_depth_iterator(entry.first);
        if (itDepth != m_depths.end())
        {
            ret.push_back({itDepth, m_depths.end(), {entry.second->first, entry.second->second}});
        }
        if (entry.second->second == (entry.second->second - entry.second->second))
        {
            m_map.erase(_value.first);

            for (auto it = itDepth; it != m_depths.end(); ++it)
            {
                auto insertEntry = m_map.select(*it);

                if (insertEntry != m_map.end())
                {
                    auto last = it;
                    ++last;
                    ret.push_back({ it, last, { insertEntry->first, insertEntry->second } });
                }
            }
        }
        return ret;
    }

A função delete_or_update segue um conceito similar ao insert_or_update, porém no fluxo oposto, com algumas nuances.

O primeiro passo é encontrar o item que queremos deletar, utilizando o método rank. Após localizar o item, decrementamos seu volume com o valor passado no evento. Em seguida, verificamos se existe alguma profundidade que será afetada por essa atualização. Se houver profundidades impactadas, o primeiro passo é gerar um evento com o volume atualizado para todas essas profundidades.

Depois, verificamos se o volume resultante é igual ao volume da posição afetada menos ele mesmo, o que, de forma indireta, verifica se o volume chegou a zero. Caso isso ocorra, removemos o item do container e geramos eventos de insert para a última posição de cada profundidade. Esse processo garante a consistência do livro de ofertas.

Análise de performance dos containers.

Nesta primeira análise, comparo o desempenho do meu container os_map com o std::map, inserindo e removendo itens. O objetivo dessa comparação é verificar se o comportamento segue a complexidade esperada do container base, que deve se manter em Θ(log n), além de avaliar o quão distante a implementação inicial está em relação à versão otimizada do std::map. Pelo gráfico, podemos observar que a adição da propriedade extra, bem como sua atualização durante inserções e remoções, não afetou a complexidade.

No segundo gráfico, a análise foca nas novas funcionalidades select e rank do os_map, que também demonstraram ser bastante eficientes.

Por fim, analisamos o desempenho do aggregated_map em relação ao número de assinaturas no container. Nesse caso, a complexidade se torna linear em função do número de assinaturas em diferentes profundidades do livro de ofertas, o que é esperado.

Conclusão

Fazer o augmenting de um container base para adicionar novas funcionalidades pode facilitar significativamente o desenvolvimento de novos algoritmos, eliminando a necessidade de soluções complexas e tornando o código mais claro e compreensível. No entanto, há um esforço inicial considerável ao implementar o container base do zero, o que pode tornar a tarefa mais trabalhosa no início. Porém, uma vez finalizado, esse esforço traz benefícios a longo prazo, simplificando o desenvolvimento futuro.

Referência:

Cormen, T. H., Leiserson, C. E., Rivest, R. L., & Stein, C. (2009). Introduction to Algorithms (3rd ed.). The MIT Press.

Wikipedia. (2025). Order statistic. Disponível em: https://en.wikipedia.org/wiki/Order_statistic

0
Subscribe to my newsletter

Read articles from Fábio Silva Santana directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Fábio Silva Santana
Fábio Silva Santana