Building a Command-Line SQL Engine for Linux Data Pipelines

Dove-WingDove-Wing
17 min read

As a Linux systems engineer , I've often needed to analyze structured data on the fly. When processing logs, system metrics, or CSV exports, I wanted something that would integrate seamlessly with existing shell scripts and pipeline workflows without the overhead of setting up a traditional database server.

This article details my journey implementing a lightweight SQL engine for Linux command-line environments.

The Unix Philosophy Meets SQL

In Linux environments, we're accustomed to powerful one-liners using tools like grep, awk, and sed. However, these tools lack SQL's expressiveness for complex analytics. My goal was to create a command-line SQL engine that embraces Unix philosophy:

  1. Do one thing well

  2. Expect output of one program to be input to another

  3. Handle text streams as a universal interface

Core Architecture Design

After examining high-performance embedded database implementations, I settled on an architecture that balances performance with simplicity. Let's explore the key components:

Command-Line Interface Integration

The entry point supports standard Unix I/O patterns:

# Process a local CSV file
csv-sql "SELECT * FROM 'data.csv' WHERE value > 100" > filtered.csv

# Process stdin from a pipeline
cat data.csv | csv-sql "SELECT id, SUM(value) FROM stdin GROUP BY id"

# Process remote data
curl https://example.com/data.csv | csv-sql "SELECT * FROM stdin LIMIT 10"

# Join data from multiple sources
csv-sql "SELECT a.timestamp, a.status, b.user 
         FROM 'server_logs.csv' a 
         JOIN 'users.csv' b 
         ON a.user_id = b.id 
         WHERE a.status = 404"

Examining modern embedded SQL engines, I implemented a similar argument parsing approach:

int main(int argc, char *argv[]) {
    config config;
    memset(&config, 0, sizeof(config));

    // Default configuration
    config.format = FORMAT_CSV;
    config.stdin_table = "stdin";

    // Parse command line arguments
    for (int i = 1; i < argc; i++) {
        if (strcmp(argv[i], "-f") == 0 || strcmp(argv[i], "--format") == 0) {
            if (i + 1 < argc) {
                if (strcmp(argv[i+1], "csv") == 0) config.format = FORMAT_CSV;
                else if (strcmp(argv[i+1], "json") == 0) config.format = FORMAT_JSON;
                else if (strcmp(argv[i+1], "table") == 0) config.format = FORMAT_TABLE;
                i++;
            }
        } else if (strcmp(argv[i], "--stdin-name") == 0) {
            if (i + 1 < argc) {
                config.stdin_table = argv[i+1];
                i++;
            }
        } else if (config.query == NULL) {
            config.query = argv[i];
        }
    }

    if (config.query == NULL) {
        fprintf(stderr, "Usage: csv-sql [OPTIONS] \"SQL QUERY\"\n");
        return 1;
    }

    // Check if stdin has data
    struct stat st;
    bool has_stdin = false;
    if (fstat(STDIN_FILENO, &st) == 0 && (st.st_mode & S_IFMT) == S_IFIFO) {
        has_stdin = true;
    }

    // Initialize database and run query
    DatabaseInstance* db = db_init();
    if (has_stdin) {
        db_register_stdin(db, config.stdin_table);
    }

    QueryResult* result = db_execute_query(db, config.query);
    output_result(result, config.format);

    db_cleanup(db);
    return 0;
}

This approach properly handles both interactive and non-interactive modes, correctly detects the presence of piped input, and registers standard input as a queryable table.

Streaming Data Source Management

The key to working with Unix pipes is treating them as first-class data sources. My implementation uses a virtual table system to abstract various data sources:

typedef struct {
    DataSourceType type;      // FILE, STDIN, HTTP, etc.
    char* name;               // Logical name in queries
    void* handle;             // Source-specific handle
    bool is_pipe;             // Is this a Unix pipe?
    TableSchema* schema;      // Inferred schema
    DataSourceVTable* vtable; // Function pointers for operations
} DataSource;

// Virtual function table for data sources
typedef struct {
    bool (*next_batch)(DataSource* source, DataChunk* output, size_t desired_count);
    bool (*rewind)(DataSource* source);
    void (*close)(DataSource* source);
} DataSourceVTable;

For stdin handling, I implemented a specialized reader that can collect data in chunks:

bool stdin_next_batch(DataSource* source, DataChunk* output, size_t desired_count) {
    FILE* stdin_handle = (FILE*)source->handle;
    char* line = NULL;
    size_t len = 0;
    ssize_t read;

    size_t lines_read = 0;
    bool eof_reached = false;

    // Allocate column vectors for this chunk
    Vector** vectors = malloc(sizeof(Vector*) * source->schema->column_count);
    for (size_t i = 0; i < source->schema->column_count; i++) {
        vectors[i] = vector_create(source->schema->column_types[i], desired_count);
    }

    while (lines_read < desired_count) {
        // Check if we need to wait for more input (for pipes)
        if (source->is_pipe) {
            fd_set read_fds;
            struct timeval tv;
            FD_ZERO(&read_fds);
            FD_SET(STDIN_FILENO, &read_fds);
            tv.tv_sec = 0;
            tv.tv_usec = 100000; // 100ms timeout

            int ret = select(STDIN_FILENO + 1, &read_fds, NULL, NULL, &tv);
            if (ret <= 0) {
                // No data available yet, but not EOF
                break;
            }
        }

        read = getline(&line, &len, stdin_handle);
        if (read == -1) {
            eof_reached = true;
            break;
        }

        // Parse CSV line into column values
        char** values = parse_csv_line(line, &source->schema->column_count);

        // Add values to vectors
        for (size_t i = 0; i < source->schema->column_count; i++) {
            vector_append_from_string(vectors[i], values[i], lines_read);
            free(values[i]);
        }
        free(values);

        lines_read++;
    }

    free(line);

    // Set up the data chunk
    output->column_count = source->schema->column_count;
    output->vectors = vectors;
    output->count = lines_read;

    return lines_read > 0 || !eof_reached;
}

This approach handles streaming inputs, including blocking appropriately on pipes while avoiding unbounded memory usage.

Schema Inference for CSV Data

To work seamlessly with CSV files, the engine needs to infer schema from the data:

TableSchema* infer_schema_from_csv(FILE* file, bool has_header, char delimiter) {
    TableSchema* schema = malloc(sizeof(TableSchema));
    char* line = NULL;
    size_t len = 0;

    // Read header row if present
    if (getline(&line, &len, file) == -1) {
        free(schema);
        free(line);
        return NULL;
    }

    // Count columns and allocate arrays
    char** sample_row = parse_csv_line(line, &schema->column_count);
    schema->column_names = malloc(sizeof(char*) * schema->column_count);
    schema->column_types = malloc(sizeof(DataType) * schema->column_count);

    // Set initial column names
    if (has_header) {
        for (size_t i = 0; i < schema->column_count; i++) {
            schema->column_names[i] = strdup(sample_row[i]);
            // Default to string type initially
            schema->column_types[i] = TYPE_VARCHAR;
        }

        // Read another line for type inference
        free_string_array(sample_row, schema->column_count);
        if (getline(&line, &len, file) == -1) {
            // File only has header, default all to string
            free(line);
            return schema;
        }
        sample_row = parse_csv_line(line, NULL);
    } else {
        // Generate default column names
        for (size_t i = 0; i < schema->column_count; i++) {
            char col_name[32];
            sprintf(col_name, "column%zu", i+1);
            schema->column_names[i] = strdup(col_name);
            schema->column_types[i] = TYPE_VARCHAR;
        }
    }

    // Infer types from sample row
    for (size_t i = 0; i < schema->column_count; i++) {
        schema->column_types[i] = infer_type_from_string(sample_row[i]);
    }

    // Sample more rows to confirm type inferences
    size_t sample_count = 0;
    while (sample_count < 100 && getline(&line, &len, file) != -1) {
        char** row = parse_csv_line(line, NULL);

        for (size_t i = 0; i < schema->column_count; i++) {
            DataType inferred = infer_type_from_string(row[i]);
            // Widen types as needed (int->float->string)
            schema->column_types[i] = widen_type(schema->column_types[i], inferred);
        }

        free_string_array(row, schema->column_count);
        sample_count++;
    }

    // Reset file position to start
    rewind(file);
    if (has_header) {
        // Skip header row
        getline(&line, &len, file);
    }

    free_string_array(sample_row, schema->column_count);
    free(line);

    return schema;
}

DataType infer_type_from_string(const char* str) {
    if (str == NULL || strlen(str) == 0) {
        return TYPE_VARCHAR;
    }

    // Try integer
    char* end;
    long long int_val = strtoll(str, &end, 10);
    if (*end == '\0') {
        return TYPE_INTEGER;
    }

    // Try float
    double float_val = strtod(str, &end);
    if (*end == '\0') {
        return TYPE_FLOAT;
    }

    // Try date (simple YYYY-MM-DD format)
    if (strlen(str) == 10 && str[4] == '-' && str[7] == '-') {
        int year, month, day;
        if (sscanf(str, "%d-%d-%d", &year, &month, &day) == 3) {
            if (year >= 1000 && year <= 9999 && month >= 1 && month <= 12 && day >= 1 && day <= 31) {
                return TYPE_DATE;
            }
        }
    }

    // Default to string
    return TYPE_VARCHAR;
}

Query Processing Engine

The query processing architecture follows a vectorized execution model for efficiency:

typedef struct {
    OperatorType type;        // SCAN, FILTER, PROJECT, etc.
    Operator* left;           // Left child
    Operator* right;          // Right child (for joins)
    void* operator_data;      // Operator-specific data
    OperatorVTable* vtable;   // Function table for this operator
} Operator;

typedef struct {
    void (*initialize)(Operator* op);
    bool (*get_next)(Operator* op, DataChunk* output);
    void (*close)(Operator* op);
} OperatorVTable;

// Example filter operator implementation
typedef struct {
    Expression* condition;
    SelectionVector* selection;
} FilterOperatorData;

void filter_initialize(Operator* op) {
    FilterOperatorData* data = (FilterOperatorData*)op->operator_data;
    data->selection = selection_vector_create(STANDARD_VECTOR_SIZE);

    // Initialize child operator
    op->left->vtable->initialize(op->left);
}

bool filter_get_next(Operator* op, DataChunk* output) {
    FilterOperatorData* data = (FilterOperatorData*)op->operator_data;
    DataChunk input;
    initialize_data_chunk(&input);

    if (!op->left->vtable->get_next(op->left, &input)) {
        return false;  // No more input
    }

    // Evaluate filter condition
    Vector result_mask;
    initialize_boolean_vector(&result_mask, input.count);
    expression_evaluate(data->condition, &input, &result_mask);

    // Apply selection to input chunk
    size_t result_count = 0;
    for (size_t i = 0; i < input.count; i++) {
        if (get_boolean_value(&result_mask, i)) {
            data->selection->selection_vector[result_count++] = i;
        }
    }

    // If nothing matches, get next batch
    if (result_count == 0) {
        free_data_chunk(&input);
        return filter_get_next(op, output);
    }

    // Copy selected rows to output
    output->column_count = input.column_count;
    output->count = result_count;
    output->vectors = malloc(sizeof(Vector*) * input.column_count);

    for (size_t i = 0; i < input.column_count; i++) {
        output->vectors[i] = vector_create(input.vectors[i]->type, result_count);
        vector_slice(input.vectors[i], output->vectors[i], data->selection, result_count);
    }

    free_data_chunk(&input);
    return true;
}

void filter_close(Operator* op) {
    FilterOperatorData* data = (FilterOperatorData*)op->operator_data;

    selection_vector_destroy(data->selection);
    op->left->vtable->close(op->left);

    free(data);
    free(op);
}

This operator-based execution model allows efficient processing of data in chunks rather than row-by-row.

Vectorized Expression Evaluation

For efficient expression evaluation, I implemented a vectorized approach:

typedef struct {
    ExpressionType type;   // COLUMN_REF, CONSTANT, FUNCTION_CALL, etc.
    DataType result_type;
    union {
        struct {
            size_t column_index;
        } column_ref;

        struct {
            Vector constant_val;
        } constant;

        struct {
            ScalarFunction function;
            Expression** children;
            size_t child_count;
        } function;

        struct {
            Expression* left;
            Expression* right;
            ComparisonType comparison_type;
        } comparison;
    } expr;
} Expression;

void expression_evaluate(Expression* expr, DataChunk* input, Vector* result) {
    switch (expr->type) {
        case EXPR_COLUMN_REF: {
            // Direct reference, just return the column
            size_t col_idx = expr->expr.column_ref.column_index;
            if (col_idx >= input->column_count) {
                // Error: invalid column reference
                vector_fill_null(result, input->count);
                return;
            }

            vector_copy(input->vectors[col_idx], result, input->count);
            break;
        }

        case EXPR_CONSTANT: {
            // Fill result with constant value
            vector_copy(&expr->expr.constant.constant_val, result, input->count);
            break;
        }

        case EXPR_FUNCTION_CALL: {
            // Evaluate function arguments first
            Vector** args = malloc(sizeof(Vector*) * expr->expr.function.child_count);
            for (size_t i = 0; i < expr->expr.function.child_count; i++) {
                args[i] = vector_create(expr->expr.function.children[i]->result_type, input->count);
                expression_evaluate(expr->expr.function.children[i], input, args[i]);
            }

            // Call function
            expr->expr.function.function(args, expr->expr.function.child_count, result, input->count);

            // Clean up argument vectors
            for (size_t i = 0; i < expr->expr.function.child_count; i++) {
                vector_destroy(args[i]);
            }
            free(args);
            break;
        }

        case EXPR_COMPARISON: {
            // Evaluate both sides of comparison
            Vector* left = vector_create(expr->expr.comparison.left->result_type, input->count);
            Vector* right = vector_create(expr->expr.comparison.right->result_type, input->count);

            expression_evaluate(expr->expr.comparison.left, input, left);
            expression_evaluate(expr->expr.comparison.right, input, right);

            // Apply comparison
            comparison_execute(expr->expr.comparison.comparison_type, left, right, result, input->count);

            vector_destroy(left);
            vector_destroy(right);
            break;
        }

        // Other expression types...
    }
}

Memory Management for Streaming

To handle potentially unbounded data streams while maintaining bounded memory usage, I implemented a streaming aggregation approach:

typedef struct {
    size_t group_col_count;
    size_t* group_col_indices;
    size_t agg_count;
    AggregateFunction** agg_functions;
    size_t* agg_col_indices;

    // Hash table for grouping
    HashTable* groups;

    // Output state
    bool output_initialized;
    HashTableIterator* iterator;
} AggregateOperatorData;

bool aggregate_get_next(Operator* op, DataChunk* output) {
    AggregateOperatorData* data = (AggregateOperatorData*)op->operator_data;

    if (!data->output_initialized) {
        // Process all input first
        DataChunk input;
        initialize_data_chunk(&input);

        while (op->left->vtable->get_next(op->left, &input)) {
            // Extract group by keys
            Vector** group_vectors = malloc(sizeof(Vector*) * data->group_col_count);
            for (size_t i = 0; i < data->group_col_count; i++) {
                group_vectors[i] = input.vectors[data->group_col_indices[i]];
            }

            // For each row, find or create group and apply aggregates
            for (size_t row = 0; row < input.count; row++) {
                // Create a hash key for this group
                HashKey key = create_hash_key(group_vectors, data->group_col_count, row);

                // Find or create group
                HashEntry* entry = hash_table_find_or_create(data->groups, key);
                AggregateState* state = (AggregateState*)entry->value;

                if (state == NULL) {
                    // New group, initialize state
                    state = aggregate_state_create(data->agg_count);
                    entry->value = state;

                    // Copy group key values
                    for (size_t i = 0; i < data->group_col_count; i++) {
                        vector_get_value(group_vectors[i], row, &state->group_values[i]);
                    }
                }

                // Apply aggregates for this row
                for (size_t i = 0; i < data->agg_count; i++) {
                    Vector* input_vector = input.vectors[data->agg_col_indices[i]];
                    Value input_value;
                    vector_get_value(input_vector, row, &input_value);

                    data->agg_functions[i]->update(state->aggregate_values[i], input_value);
                }
            }

            free(group_vectors);
            free_data_chunk(&input);
            initialize_data_chunk(&input);
        }

        // Set up iteration for output
        data->iterator = hash_table_iterate(data->groups);
        data->output_initialized = true;
    }

    // Output next batch of aggregated results
    const size_t max_output = STANDARD_VECTOR_SIZE;
    size_t output_count = 0;

    // Allocate output vectors: first the group by columns, then the aggregates
    output->column_count = data->group_col_count + data->agg_count;
    output->vectors = malloc(sizeof(Vector*) * output->column_count);

    // Create output vectors
    for (size_t i = 0; i < output->column_count; i++) {
        DataType type;
        if (i < data->group_col_count) {
            // Group column type
            type = get_group_column_type(data, i);
        } else {
            // Aggregate result type
            type = data->agg_functions[i - data->group_col_count]->result_type;
        }
        output->vectors[i] = vector_create(type, max_output);
    }

    // Fetch results from hash table
    HashEntry* entry;
    while (output_count < max_output && (entry = hash_table_iterator_next(data->iterator)) != NULL) {
        AggregateState* state = (AggregateState*)entry->value;

        // Copy group values
        for (size_t i = 0; i < data->group_col_count; i++) {
            vector_set_value(output->vectors[i], output_count, state->group_values[i]);
        }

        // Finalize and copy aggregate values
        for (size_t i = 0; i < data->agg_count; i++) {
            Value result = data->agg_functions[i]->finalize(state->aggregate_values[i]);
            vector_set_value(output->vectors[i + data->group_col_count], output_count, result);
        }

        output_count++;
    }

    output->count = output_count;

    // Return false when no more groups
    if (output_count == 0) {
        free_data_chunk(output);
        return false;
    }

    return true;
}

This approach handles aggregation efficiently, even with large input streams, by summarizing data on-the-fly rather than materializing the entire dataset.

Integration with Linux Pipes and Signals

To work properly in Linux environments, the engine needs to handle pipe semantics and signals correctly:

// Set up graceful termination
volatile sig_atomic_t stop_requested = 0;

void handle_signal(int sig) {
    stop_requested = 1;
}

void setup_signal_handlers() {
    struct sigaction sa;
    memset(&sa, 0, sizeof(sa));
    sa.sa_handler = handle_signal;
    sigaction(SIGINT, &sa, NULL);
    sigaction(SIGTERM, &sa, NULL);
}

// Make stdin non-blocking for better pipe handling
void setup_nonblocking_stdin() {
    int flags = fcntl(STDIN_FILENO, F_GETFL, 0);
    fcntl(STDIN_FILENO, F_SETFL, flags | O_NONBLOCK);
}

// Check for termination requests during processing
bool should_continue_processing() {
    return !stop_requested;
}

This signal handling ensures the program responds properly to Ctrl+C and other termination signals, cleaning up resources appropriately.

Practical Examples and Performance Tuning

Processing Server Logs

One of my most common tasks is analyzing web server logs:

# Find top client IPs by number of 404 responses
cat /var/log/nginx/access.log | 
  csv-sql -f csv --delimiter=' ' \
  "SELECT split_part(column1, ' - ', 1) as client_ip, 
          COUNT(*) as count 
   FROM stdin 
   WHERE column9 = '404' 
   GROUP BY client_ip 
   ORDER BY count DESC 
   LIMIT 10"

System Monitoring and Analysis

I frequently use the tool to analyze system metrics:

# Calculate 95th percentile response time by endpoint
cat performance_metrics.csv | 
  csv-sql "SELECT endpoint, 
                  percentile_cont(0.95) WITHIN GROUP (ORDER BY response_time) as p95 
           FROM stdin 
           GROUP BY endpoint 
           ORDER BY p95 DESC"

Resource Utilization During Processing

Memory efficiency is critical when processing large datasets on constrained systems. I use a staged approach for operations like sorting that can't be fully streamed:

void sort_initialize(Operator* op) {
    SortOperatorData* data = (SortOperatorData*)op->operator_data;
    data->sorted = false;
    data->current_pos = 0;

    // Determine memory limits
    size_t available_memory = get_available_memory() * 0.8;  // Use 80% of available memory
    size_t row_estimate_size = estimate_row_size(data->input_types, data->input_count);
    data->max_rows = available_memory / row_estimate_size;

    if (data->max_rows < MINIMUM_SORT_ROWS) {
        data->max_rows = MINIMUM_SORT_ROWS;  // Ensure minimum batch size
    }

    data->storage = vector_heap_create(data->max_rows);
}

bool sort_get_next(Operator* op, DataChunk* output) {
    SortOperatorData* data = (SortOperatorData*)op->operator_data;

    if (!data->sorted) {
        // Load and sort data
        DataChunk input;
        initialize_data_chunk(&input);

        while (op->left->vtable->get_next(op->left, &input)) {
            // Check if we need to switch to external sort
            if (data->storage->count + input.count > data->max_rows && 
                !data->storage->is_external) {
                // Convert to external storage
                vector_heap_convert_to_external(data->storage);
            }

            // Add rows to storage
            for (size_t i = 0; i < input.count; i++) {
                VectorHeapRow row = extract_row_from_chunk(&input, i);
                vector_heap_add(data->storage, row);
            }

            free_data_chunk(&input);
            initialize_data_chunk(&input);

            // Check for user interruption
            if (stop_requested) {
                clean_up_and_exit(op);
                return false;
            }
        }

        // Sort the data
        vector_heap_sort(data->storage, data->sort_expressions, data->sort_directions, 
                        data->expr_count);
        data->sorted = true;
    }

    // Output next batch of sorted data
    size_t output_count = 0;
    const size_t max_output = STANDARD_VECTOR_SIZE;

    // Set up output chunk
    initialize_data_chunk(output);
    output->column_count = data->input_count;
    output->vectors = malloc(sizeof(Vector*) * output->column_count);
    for (size_t i = 0; i < output->column_count; i++) {
        output->vectors[i] = vector_create(data->input_types[i], max_output);
    }

    // Fetch sorted rows
    while (output_count < max_output && data->current_pos < data->storage->count) {
        VectorHeapRow row = vector_heap_get(data->storage, data->current_pos);

        for (size_t i = 0; i < output->column_count; i++) {
            vector_set_value(output->vectors[i], output_count, row.values[i]);
        }

        data->current_pos++;
        output_count++;
    }

    output->count = output_count;

    return output_count > 0;
}

Performance Optimizations

Columnar Data Processing

To achieve better CPU cache utilization, I use a columnar data representation:

typedef struct {
    DataType type;
    void* data;           // Raw type-specific data
    bool* nullmask;       // Bitmap indicating NULL values
    size_t capacity;      // Allocated capacity
} Vector;

typedef struct {
    size_t column_count;
    Vector** vectors;
    size_t count;         // Number of rows
} DataChunk;

Vector* vector_create(DataType type, size_t capacity) {
    Vector* vector = malloc(sizeof(Vector));
    vector->type = type;
    vector->capacity = capacity;

    // Allocate data array based on type
    switch (type) {
        case TYPE_INTEGER:
            vector->data = malloc(sizeof(int64_t) * capacity);
            break;
        case TYPE_FLOAT:
            vector->data = malloc(sizeof(double) * capacity);
            break;
        case TYPE_VARCHAR:
            // For strings, we store char* pointers
            vector->data = malloc(sizeof(char*) * capacity);
            memset(vector->data, 0, sizeof(char*) * capacity);
            break;
        case TYPE_BOOLEAN:
            vector->data = malloc(sizeof(bool) * capacity);
            break;
        case TYPE_DATE:
            vector->data = malloc(sizeof(date_t) * capacity);
            break;
    }

    // Allocate NULL mask
    vector->nullmask = malloc(sizeof(bool) * capacity);
    memset(vector->nullmask, 0, sizeof(bool) * capacity);

    return vector;
}

This columnar approach significantly improves performance on analytical queries compared to row-based processing, especially when operations only need to touch a subset of columns.

SIMD Optimizations for Filter Operations

For filtering operations on numeric data, I use SIMD instructions when available:

void comparison_equals_integers_simd(Vector* left, Vector* right, Vector* result, size_t count) {
    int64_t* left_data = (int64_t*)left->data;
    int64_t* right_data = (int64_t*)right->data;
    bool* result_data = (bool*)result->data;

#ifdef __AVX2__
    // Use AVX2 if available
    size_t i = 0;
    for (; i + 4 <= count; i += 4) {
        __m256i left_vec = _mm256_loadu_si256((__m256i*)&left_data[i]);
        __m256i right_vec = _mm256_loadu_si256((__m256i*)&right_data[i]);
        __m256i cmp_result = _mm256_cmpeq_epi64(left_vec, right_vec);

        // Extract the comparison results
        result_data[i] = _mm256_extract_epi64(cmp_result, 0) != 0;
        result_data[i+1] = _mm256_extract_epi64(cmp_result, 1) != 0;
        result_data[i+2] = _mm256_extract_epi64(cmp_result, 2) != 0;
        result_data[i+3] = _mm256_extract_epi64(cmp_result, 3) != 0;
    }

    // Process remaining elements
    for (; i < count; i++) {
        result_data[i] = left_data[i] == right_data[i];
    }
#else
    // Fallback to scalar code
    for (size_t i = 0; i < count; i++) {
        result_data[i] = left_data[i] == right_data[i];
    }
#endif

    // Handle nulls
    for (size_t i = 0; i < count; i++) {
        if (left->nullmask[i] || right->nullmask[i]) {
            result->nullmask[i] = true;
            result_data[i] = false;
        } else {
            result->nullmask[i] = false;
        }
    }
}

Parallel Query Execution

For multi-core systems, I implemented parallel execution strategies:

typedef struct {
    Operator* original_op;           // Original operator tree
    Operator** parallel_ops;         // Per-thread operator trees
    size_t thread_count;             // Number of threads
    pthread_t* threads;              // Thread handles
    Queue** result_queues;           // Per-thread result queues
    bool* thread_finished;           // Thread status flags
    pthread_mutex_t mutex;           // Synchronization mutex
    size_t next_result_thread;       // Next thread to get results from
} ParallelScanOperatorData;

void* parallel_scan_thread(void* arg) {
    ThreadContext* ctx = (ThreadContext*)arg;
    Operator* op = ctx->operator;
    Queue* queue = ctx->result_queue;

    DataChunk chunk;
    initialize_data_chunk(&chunk);

    while (op->vtable->get_next(op, &chunk)) {
        // Clone the chunk for thread safety
        DataChunk* chunk_copy = clone_data_chunk(&chunk);

        // Add to result queue
        queue_push(queue, chunk_copy);

        initialize_data_chunk(&chunk);

        // Check for termination request
        if (stop_requested) {
            break;
        }
    }

    // Signal that this thread is done
    pthread_mutex_lock(&ctx->parent->mutex);
    ctx->parent->thread_finished[ctx->thread_id] = true;
    pthread_mutex_unlock(&ctx->parent->mutex);

    return NULL;
}

bool parallel_scan_get_next(Operator* op, DataChunk* output) {
    ParallelScanOperatorData* data = (ParallelScanOperatorData*)op->operator_data;

    // Initialize threads on first call
    if (data->threads == NULL) {
        initialize_parallel_scan(data);
    }

    while (true) {
        pthread_mutex_lock(&data->mutex);

        // Check if we have a result from the current thread
        Queue* current_queue = data->result_queues[data->next_result_thread];
        DataChunk* result = queue_pop(current_queue);

        if (result != NULL) {
            // We have a result, copy it to output
            copy_data_chunk(result, output);
            free_data_chunk(result);
            free(result);

            pthread_mutex_unlock(&data->mutex);
            return true;
        }

        // No result from current thread, check if it's finished
        if (data->thread_finished[data->next_result_thread]) {
            // Try next thread
            data->next_result_thread = (data->next_result_thread + 1) % data->thread_count;

            // If we've checked all threads and they're all done, we're finished
            if (all_threads_finished(data)) {
                pthread_mutex_unlock(&data->mutex);
                return false;
            }
        }

        pthread_mutex_unlock(&data->mutex);

        // No results available yet, wait a bit
        usleep(1000);  // 1ms sleep to avoid spinning
    }
}

Production-Grade Considerations

Error Handling and Resilience

To ensure reliability in production environments, robust error handling is essential:

#define TRY_GOTO(expr, label) \
    do { \
        int __error = (expr); \
        if (__error) { \
            fprintf(stderr, "Error at %s:%d: %s\n", __FILE__, __LINE__, error_message(__error)); \
            goto label; \
        } \
    } while (0)

int execute_query_with_error_handling(DatabaseInstance* db, const char* query) {
    int result = 0;
    QueryPlan* plan = NULL;
    Operator* root_op = NULL;

    // Parse and prepare query
    TRY_GOTO(parse_query(query, &plan), cleanup);
    TRY_GOTO(prepare_query(db, plan), cleanup);
    TRY_GOTO(optimize_query(db, plan), cleanup);
    TRY_GOTO(build_execution_tree(db, plan, &root_op), cleanup);

    // Execute query
    result = execute_query_tree(db, root_op);

cleanup:
    if (root_op) close_operator_tree(root_op);
    if (plan) free_query_plan(plan);
    return result;
}

Resource Management

Careful management of system resources is critical:

void configure_resource_limits(ResourceLimits* limits) {
    // Determine system resources
    struct sysinfo info;
    sysinfo(&info);

    // Calculate memory limits
    unsigned long total_mem = info.totalram * info.mem_unit;
    unsigned long mem_limit = MIN(total_mem * 0.7, MAX_MEMORY_USAGE);

    // Set limits
    limits->memory_limit = mem_limit;
    limits->max_threads = MIN(get_nprocs(), MAX_THREADS);

    // Print configuration
    fprintf(stderr, "Configured with:\n");
    fprintf(stderr, "  Memory limit: %lu MB\n", limits->memory_limit / (1024*1024));
    fprintf(stderr, "  Max threads: %d\n", limits->max_threads);
}

// Track memory usage
void* tracked_malloc(size_t size) {
    pthread_mutex_lock(&memory_mutex);

    if (current_memory_usage + size > memory_limit) {
        pthread_mutex_unlock(&memory_mutex);
        throw_out_of_memory_error();
        return NULL;
    }

    void* ptr = malloc(size);
    if (ptr) {
        current_memory_usage += size;

        // Store size for later free
        size_t* size_ptr = malloc(sizeof(size_t));
        *size_ptr = size;
        hash_table_insert(allocation_table, ptr, size_ptr);
    }

    pthread_mutex_unlock(&memory_mutex);
    return ptr;
}

void tracked_free(void* ptr) {
    if (!ptr) return;

    pthread_mutex_lock(&memory_mutex);

    size_t* size_ptr = hash_table_lookup(allocation_table, ptr);
    if (size_ptr) {
        current_memory_usage -= *size_ptr;
        hash_table_remove(allocation_table, ptr);
        free(size_ptr);
    }

    free(ptr);

    pthread_mutex_unlock(&memory_mutex);
}

Conclusion

Building a command-line SQL engine for Linux data pipelines has been a challenging but rewarding project. The architecture described here provides a powerful tool for data analysis directly within the shell environment, enabling sophisticated data manipulation without the overhead of a traditional database.

The key insights from this implementation are:

  1. Columnar data representation dramatically improves performance for analytical workloads

  2. Vectorized execution provides efficient CPU utilization

  3. Stream processing enables handling datasets larger than available memory

  4. Unix integration makes the tool feel like a natural part of the shell environment

With these techniques, the engine can process gigabytes of data efficiently even on modest hardware, making it an invaluable addition to any Linux system engineer's toolkit.

I will continue to explore and expand on this tool and share my findings in future articles.

0
Subscribe to my newsletter

Read articles from Dove-Wing directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Dove-Wing
Dove-Wing