Building a Command-Line SQL Engine for Linux Data Pipelines

As a Linux systems engineer , I've often needed to analyze structured data on the fly. When processing logs, system metrics, or CSV exports, I wanted something that would integrate seamlessly with existing shell scripts and pipeline workflows without the overhead of setting up a traditional database server.
This article details my journey implementing a lightweight SQL engine for Linux command-line environments.
The Unix Philosophy Meets SQL
In Linux environments, we're accustomed to powerful one-liners using tools like grep
, awk
, and sed
. However, these tools lack SQL's expressiveness for complex analytics. My goal was to create a command-line SQL engine that embraces Unix philosophy:
Do one thing well
Expect output of one program to be input to another
Handle text streams as a universal interface
Core Architecture Design
After examining high-performance embedded database implementations, I settled on an architecture that balances performance with simplicity. Let's explore the key components:
Command-Line Interface Integration
The entry point supports standard Unix I/O patterns:
# Process a local CSV file
csv-sql "SELECT * FROM 'data.csv' WHERE value > 100" > filtered.csv
# Process stdin from a pipeline
cat data.csv | csv-sql "SELECT id, SUM(value) FROM stdin GROUP BY id"
# Process remote data
curl https://example.com/data.csv | csv-sql "SELECT * FROM stdin LIMIT 10"
# Join data from multiple sources
csv-sql "SELECT a.timestamp, a.status, b.user
FROM 'server_logs.csv' a
JOIN 'users.csv' b
ON a.user_id = b.id
WHERE a.status = 404"
Examining modern embedded SQL engines, I implemented a similar argument parsing approach:
int main(int argc, char *argv[]) {
config config;
memset(&config, 0, sizeof(config));
// Default configuration
config.format = FORMAT_CSV;
config.stdin_table = "stdin";
// Parse command line arguments
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "-f") == 0 || strcmp(argv[i], "--format") == 0) {
if (i + 1 < argc) {
if (strcmp(argv[i+1], "csv") == 0) config.format = FORMAT_CSV;
else if (strcmp(argv[i+1], "json") == 0) config.format = FORMAT_JSON;
else if (strcmp(argv[i+1], "table") == 0) config.format = FORMAT_TABLE;
i++;
}
} else if (strcmp(argv[i], "--stdin-name") == 0) {
if (i + 1 < argc) {
config.stdin_table = argv[i+1];
i++;
}
} else if (config.query == NULL) {
config.query = argv[i];
}
}
if (config.query == NULL) {
fprintf(stderr, "Usage: csv-sql [OPTIONS] \"SQL QUERY\"\n");
return 1;
}
// Check if stdin has data
struct stat st;
bool has_stdin = false;
if (fstat(STDIN_FILENO, &st) == 0 && (st.st_mode & S_IFMT) == S_IFIFO) {
has_stdin = true;
}
// Initialize database and run query
DatabaseInstance* db = db_init();
if (has_stdin) {
db_register_stdin(db, config.stdin_table);
}
QueryResult* result = db_execute_query(db, config.query);
output_result(result, config.format);
db_cleanup(db);
return 0;
}
This approach properly handles both interactive and non-interactive modes, correctly detects the presence of piped input, and registers standard input as a queryable table.
Streaming Data Source Management
The key to working with Unix pipes is treating them as first-class data sources. My implementation uses a virtual table system to abstract various data sources:
typedef struct {
DataSourceType type; // FILE, STDIN, HTTP, etc.
char* name; // Logical name in queries
void* handle; // Source-specific handle
bool is_pipe; // Is this a Unix pipe?
TableSchema* schema; // Inferred schema
DataSourceVTable* vtable; // Function pointers for operations
} DataSource;
// Virtual function table for data sources
typedef struct {
bool (*next_batch)(DataSource* source, DataChunk* output, size_t desired_count);
bool (*rewind)(DataSource* source);
void (*close)(DataSource* source);
} DataSourceVTable;
For stdin handling, I implemented a specialized reader that can collect data in chunks:
bool stdin_next_batch(DataSource* source, DataChunk* output, size_t desired_count) {
FILE* stdin_handle = (FILE*)source->handle;
char* line = NULL;
size_t len = 0;
ssize_t read;
size_t lines_read = 0;
bool eof_reached = false;
// Allocate column vectors for this chunk
Vector** vectors = malloc(sizeof(Vector*) * source->schema->column_count);
for (size_t i = 0; i < source->schema->column_count; i++) {
vectors[i] = vector_create(source->schema->column_types[i], desired_count);
}
while (lines_read < desired_count) {
// Check if we need to wait for more input (for pipes)
if (source->is_pipe) {
fd_set read_fds;
struct timeval tv;
FD_ZERO(&read_fds);
FD_SET(STDIN_FILENO, &read_fds);
tv.tv_sec = 0;
tv.tv_usec = 100000; // 100ms timeout
int ret = select(STDIN_FILENO + 1, &read_fds, NULL, NULL, &tv);
if (ret <= 0) {
// No data available yet, but not EOF
break;
}
}
read = getline(&line, &len, stdin_handle);
if (read == -1) {
eof_reached = true;
break;
}
// Parse CSV line into column values
char** values = parse_csv_line(line, &source->schema->column_count);
// Add values to vectors
for (size_t i = 0; i < source->schema->column_count; i++) {
vector_append_from_string(vectors[i], values[i], lines_read);
free(values[i]);
}
free(values);
lines_read++;
}
free(line);
// Set up the data chunk
output->column_count = source->schema->column_count;
output->vectors = vectors;
output->count = lines_read;
return lines_read > 0 || !eof_reached;
}
This approach handles streaming inputs, including blocking appropriately on pipes while avoiding unbounded memory usage.
Schema Inference for CSV Data
To work seamlessly with CSV files, the engine needs to infer schema from the data:
TableSchema* infer_schema_from_csv(FILE* file, bool has_header, char delimiter) {
TableSchema* schema = malloc(sizeof(TableSchema));
char* line = NULL;
size_t len = 0;
// Read header row if present
if (getline(&line, &len, file) == -1) {
free(schema);
free(line);
return NULL;
}
// Count columns and allocate arrays
char** sample_row = parse_csv_line(line, &schema->column_count);
schema->column_names = malloc(sizeof(char*) * schema->column_count);
schema->column_types = malloc(sizeof(DataType) * schema->column_count);
// Set initial column names
if (has_header) {
for (size_t i = 0; i < schema->column_count; i++) {
schema->column_names[i] = strdup(sample_row[i]);
// Default to string type initially
schema->column_types[i] = TYPE_VARCHAR;
}
// Read another line for type inference
free_string_array(sample_row, schema->column_count);
if (getline(&line, &len, file) == -1) {
// File only has header, default all to string
free(line);
return schema;
}
sample_row = parse_csv_line(line, NULL);
} else {
// Generate default column names
for (size_t i = 0; i < schema->column_count; i++) {
char col_name[32];
sprintf(col_name, "column%zu", i+1);
schema->column_names[i] = strdup(col_name);
schema->column_types[i] = TYPE_VARCHAR;
}
}
// Infer types from sample row
for (size_t i = 0; i < schema->column_count; i++) {
schema->column_types[i] = infer_type_from_string(sample_row[i]);
}
// Sample more rows to confirm type inferences
size_t sample_count = 0;
while (sample_count < 100 && getline(&line, &len, file) != -1) {
char** row = parse_csv_line(line, NULL);
for (size_t i = 0; i < schema->column_count; i++) {
DataType inferred = infer_type_from_string(row[i]);
// Widen types as needed (int->float->string)
schema->column_types[i] = widen_type(schema->column_types[i], inferred);
}
free_string_array(row, schema->column_count);
sample_count++;
}
// Reset file position to start
rewind(file);
if (has_header) {
// Skip header row
getline(&line, &len, file);
}
free_string_array(sample_row, schema->column_count);
free(line);
return schema;
}
DataType infer_type_from_string(const char* str) {
if (str == NULL || strlen(str) == 0) {
return TYPE_VARCHAR;
}
// Try integer
char* end;
long long int_val = strtoll(str, &end, 10);
if (*end == '\0') {
return TYPE_INTEGER;
}
// Try float
double float_val = strtod(str, &end);
if (*end == '\0') {
return TYPE_FLOAT;
}
// Try date (simple YYYY-MM-DD format)
if (strlen(str) == 10 && str[4] == '-' && str[7] == '-') {
int year, month, day;
if (sscanf(str, "%d-%d-%d", &year, &month, &day) == 3) {
if (year >= 1000 && year <= 9999 && month >= 1 && month <= 12 && day >= 1 && day <= 31) {
return TYPE_DATE;
}
}
}
// Default to string
return TYPE_VARCHAR;
}
Query Processing Engine
The query processing architecture follows a vectorized execution model for efficiency:
typedef struct {
OperatorType type; // SCAN, FILTER, PROJECT, etc.
Operator* left; // Left child
Operator* right; // Right child (for joins)
void* operator_data; // Operator-specific data
OperatorVTable* vtable; // Function table for this operator
} Operator;
typedef struct {
void (*initialize)(Operator* op);
bool (*get_next)(Operator* op, DataChunk* output);
void (*close)(Operator* op);
} OperatorVTable;
// Example filter operator implementation
typedef struct {
Expression* condition;
SelectionVector* selection;
} FilterOperatorData;
void filter_initialize(Operator* op) {
FilterOperatorData* data = (FilterOperatorData*)op->operator_data;
data->selection = selection_vector_create(STANDARD_VECTOR_SIZE);
// Initialize child operator
op->left->vtable->initialize(op->left);
}
bool filter_get_next(Operator* op, DataChunk* output) {
FilterOperatorData* data = (FilterOperatorData*)op->operator_data;
DataChunk input;
initialize_data_chunk(&input);
if (!op->left->vtable->get_next(op->left, &input)) {
return false; // No more input
}
// Evaluate filter condition
Vector result_mask;
initialize_boolean_vector(&result_mask, input.count);
expression_evaluate(data->condition, &input, &result_mask);
// Apply selection to input chunk
size_t result_count = 0;
for (size_t i = 0; i < input.count; i++) {
if (get_boolean_value(&result_mask, i)) {
data->selection->selection_vector[result_count++] = i;
}
}
// If nothing matches, get next batch
if (result_count == 0) {
free_data_chunk(&input);
return filter_get_next(op, output);
}
// Copy selected rows to output
output->column_count = input.column_count;
output->count = result_count;
output->vectors = malloc(sizeof(Vector*) * input.column_count);
for (size_t i = 0; i < input.column_count; i++) {
output->vectors[i] = vector_create(input.vectors[i]->type, result_count);
vector_slice(input.vectors[i], output->vectors[i], data->selection, result_count);
}
free_data_chunk(&input);
return true;
}
void filter_close(Operator* op) {
FilterOperatorData* data = (FilterOperatorData*)op->operator_data;
selection_vector_destroy(data->selection);
op->left->vtable->close(op->left);
free(data);
free(op);
}
This operator-based execution model allows efficient processing of data in chunks rather than row-by-row.
Vectorized Expression Evaluation
For efficient expression evaluation, I implemented a vectorized approach:
typedef struct {
ExpressionType type; // COLUMN_REF, CONSTANT, FUNCTION_CALL, etc.
DataType result_type;
union {
struct {
size_t column_index;
} column_ref;
struct {
Vector constant_val;
} constant;
struct {
ScalarFunction function;
Expression** children;
size_t child_count;
} function;
struct {
Expression* left;
Expression* right;
ComparisonType comparison_type;
} comparison;
} expr;
} Expression;
void expression_evaluate(Expression* expr, DataChunk* input, Vector* result) {
switch (expr->type) {
case EXPR_COLUMN_REF: {
// Direct reference, just return the column
size_t col_idx = expr->expr.column_ref.column_index;
if (col_idx >= input->column_count) {
// Error: invalid column reference
vector_fill_null(result, input->count);
return;
}
vector_copy(input->vectors[col_idx], result, input->count);
break;
}
case EXPR_CONSTANT: {
// Fill result with constant value
vector_copy(&expr->expr.constant.constant_val, result, input->count);
break;
}
case EXPR_FUNCTION_CALL: {
// Evaluate function arguments first
Vector** args = malloc(sizeof(Vector*) * expr->expr.function.child_count);
for (size_t i = 0; i < expr->expr.function.child_count; i++) {
args[i] = vector_create(expr->expr.function.children[i]->result_type, input->count);
expression_evaluate(expr->expr.function.children[i], input, args[i]);
}
// Call function
expr->expr.function.function(args, expr->expr.function.child_count, result, input->count);
// Clean up argument vectors
for (size_t i = 0; i < expr->expr.function.child_count; i++) {
vector_destroy(args[i]);
}
free(args);
break;
}
case EXPR_COMPARISON: {
// Evaluate both sides of comparison
Vector* left = vector_create(expr->expr.comparison.left->result_type, input->count);
Vector* right = vector_create(expr->expr.comparison.right->result_type, input->count);
expression_evaluate(expr->expr.comparison.left, input, left);
expression_evaluate(expr->expr.comparison.right, input, right);
// Apply comparison
comparison_execute(expr->expr.comparison.comparison_type, left, right, result, input->count);
vector_destroy(left);
vector_destroy(right);
break;
}
// Other expression types...
}
}
Memory Management for Streaming
To handle potentially unbounded data streams while maintaining bounded memory usage, I implemented a streaming aggregation approach:
typedef struct {
size_t group_col_count;
size_t* group_col_indices;
size_t agg_count;
AggregateFunction** agg_functions;
size_t* agg_col_indices;
// Hash table for grouping
HashTable* groups;
// Output state
bool output_initialized;
HashTableIterator* iterator;
} AggregateOperatorData;
bool aggregate_get_next(Operator* op, DataChunk* output) {
AggregateOperatorData* data = (AggregateOperatorData*)op->operator_data;
if (!data->output_initialized) {
// Process all input first
DataChunk input;
initialize_data_chunk(&input);
while (op->left->vtable->get_next(op->left, &input)) {
// Extract group by keys
Vector** group_vectors = malloc(sizeof(Vector*) * data->group_col_count);
for (size_t i = 0; i < data->group_col_count; i++) {
group_vectors[i] = input.vectors[data->group_col_indices[i]];
}
// For each row, find or create group and apply aggregates
for (size_t row = 0; row < input.count; row++) {
// Create a hash key for this group
HashKey key = create_hash_key(group_vectors, data->group_col_count, row);
// Find or create group
HashEntry* entry = hash_table_find_or_create(data->groups, key);
AggregateState* state = (AggregateState*)entry->value;
if (state == NULL) {
// New group, initialize state
state = aggregate_state_create(data->agg_count);
entry->value = state;
// Copy group key values
for (size_t i = 0; i < data->group_col_count; i++) {
vector_get_value(group_vectors[i], row, &state->group_values[i]);
}
}
// Apply aggregates for this row
for (size_t i = 0; i < data->agg_count; i++) {
Vector* input_vector = input.vectors[data->agg_col_indices[i]];
Value input_value;
vector_get_value(input_vector, row, &input_value);
data->agg_functions[i]->update(state->aggregate_values[i], input_value);
}
}
free(group_vectors);
free_data_chunk(&input);
initialize_data_chunk(&input);
}
// Set up iteration for output
data->iterator = hash_table_iterate(data->groups);
data->output_initialized = true;
}
// Output next batch of aggregated results
const size_t max_output = STANDARD_VECTOR_SIZE;
size_t output_count = 0;
// Allocate output vectors: first the group by columns, then the aggregates
output->column_count = data->group_col_count + data->agg_count;
output->vectors = malloc(sizeof(Vector*) * output->column_count);
// Create output vectors
for (size_t i = 0; i < output->column_count; i++) {
DataType type;
if (i < data->group_col_count) {
// Group column type
type = get_group_column_type(data, i);
} else {
// Aggregate result type
type = data->agg_functions[i - data->group_col_count]->result_type;
}
output->vectors[i] = vector_create(type, max_output);
}
// Fetch results from hash table
HashEntry* entry;
while (output_count < max_output && (entry = hash_table_iterator_next(data->iterator)) != NULL) {
AggregateState* state = (AggregateState*)entry->value;
// Copy group values
for (size_t i = 0; i < data->group_col_count; i++) {
vector_set_value(output->vectors[i], output_count, state->group_values[i]);
}
// Finalize and copy aggregate values
for (size_t i = 0; i < data->agg_count; i++) {
Value result = data->agg_functions[i]->finalize(state->aggregate_values[i]);
vector_set_value(output->vectors[i + data->group_col_count], output_count, result);
}
output_count++;
}
output->count = output_count;
// Return false when no more groups
if (output_count == 0) {
free_data_chunk(output);
return false;
}
return true;
}
This approach handles aggregation efficiently, even with large input streams, by summarizing data on-the-fly rather than materializing the entire dataset.
Integration with Linux Pipes and Signals
To work properly in Linux environments, the engine needs to handle pipe semantics and signals correctly:
// Set up graceful termination
volatile sig_atomic_t stop_requested = 0;
void handle_signal(int sig) {
stop_requested = 1;
}
void setup_signal_handlers() {
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = handle_signal;
sigaction(SIGINT, &sa, NULL);
sigaction(SIGTERM, &sa, NULL);
}
// Make stdin non-blocking for better pipe handling
void setup_nonblocking_stdin() {
int flags = fcntl(STDIN_FILENO, F_GETFL, 0);
fcntl(STDIN_FILENO, F_SETFL, flags | O_NONBLOCK);
}
// Check for termination requests during processing
bool should_continue_processing() {
return !stop_requested;
}
This signal handling ensures the program responds properly to Ctrl+C and other termination signals, cleaning up resources appropriately.
Practical Examples and Performance Tuning
Processing Server Logs
One of my most common tasks is analyzing web server logs:
# Find top client IPs by number of 404 responses
cat /var/log/nginx/access.log |
csv-sql -f csv --delimiter=' ' \
"SELECT split_part(column1, ' - ', 1) as client_ip,
COUNT(*) as count
FROM stdin
WHERE column9 = '404'
GROUP BY client_ip
ORDER BY count DESC
LIMIT 10"
System Monitoring and Analysis
I frequently use the tool to analyze system metrics:
# Calculate 95th percentile response time by endpoint
cat performance_metrics.csv |
csv-sql "SELECT endpoint,
percentile_cont(0.95) WITHIN GROUP (ORDER BY response_time) as p95
FROM stdin
GROUP BY endpoint
ORDER BY p95 DESC"
Resource Utilization During Processing
Memory efficiency is critical when processing large datasets on constrained systems. I use a staged approach for operations like sorting that can't be fully streamed:
void sort_initialize(Operator* op) {
SortOperatorData* data = (SortOperatorData*)op->operator_data;
data->sorted = false;
data->current_pos = 0;
// Determine memory limits
size_t available_memory = get_available_memory() * 0.8; // Use 80% of available memory
size_t row_estimate_size = estimate_row_size(data->input_types, data->input_count);
data->max_rows = available_memory / row_estimate_size;
if (data->max_rows < MINIMUM_SORT_ROWS) {
data->max_rows = MINIMUM_SORT_ROWS; // Ensure minimum batch size
}
data->storage = vector_heap_create(data->max_rows);
}
bool sort_get_next(Operator* op, DataChunk* output) {
SortOperatorData* data = (SortOperatorData*)op->operator_data;
if (!data->sorted) {
// Load and sort data
DataChunk input;
initialize_data_chunk(&input);
while (op->left->vtable->get_next(op->left, &input)) {
// Check if we need to switch to external sort
if (data->storage->count + input.count > data->max_rows &&
!data->storage->is_external) {
// Convert to external storage
vector_heap_convert_to_external(data->storage);
}
// Add rows to storage
for (size_t i = 0; i < input.count; i++) {
VectorHeapRow row = extract_row_from_chunk(&input, i);
vector_heap_add(data->storage, row);
}
free_data_chunk(&input);
initialize_data_chunk(&input);
// Check for user interruption
if (stop_requested) {
clean_up_and_exit(op);
return false;
}
}
// Sort the data
vector_heap_sort(data->storage, data->sort_expressions, data->sort_directions,
data->expr_count);
data->sorted = true;
}
// Output next batch of sorted data
size_t output_count = 0;
const size_t max_output = STANDARD_VECTOR_SIZE;
// Set up output chunk
initialize_data_chunk(output);
output->column_count = data->input_count;
output->vectors = malloc(sizeof(Vector*) * output->column_count);
for (size_t i = 0; i < output->column_count; i++) {
output->vectors[i] = vector_create(data->input_types[i], max_output);
}
// Fetch sorted rows
while (output_count < max_output && data->current_pos < data->storage->count) {
VectorHeapRow row = vector_heap_get(data->storage, data->current_pos);
for (size_t i = 0; i < output->column_count; i++) {
vector_set_value(output->vectors[i], output_count, row.values[i]);
}
data->current_pos++;
output_count++;
}
output->count = output_count;
return output_count > 0;
}
Performance Optimizations
Columnar Data Processing
To achieve better CPU cache utilization, I use a columnar data representation:
typedef struct {
DataType type;
void* data; // Raw type-specific data
bool* nullmask; // Bitmap indicating NULL values
size_t capacity; // Allocated capacity
} Vector;
typedef struct {
size_t column_count;
Vector** vectors;
size_t count; // Number of rows
} DataChunk;
Vector* vector_create(DataType type, size_t capacity) {
Vector* vector = malloc(sizeof(Vector));
vector->type = type;
vector->capacity = capacity;
// Allocate data array based on type
switch (type) {
case TYPE_INTEGER:
vector->data = malloc(sizeof(int64_t) * capacity);
break;
case TYPE_FLOAT:
vector->data = malloc(sizeof(double) * capacity);
break;
case TYPE_VARCHAR:
// For strings, we store char* pointers
vector->data = malloc(sizeof(char*) * capacity);
memset(vector->data, 0, sizeof(char*) * capacity);
break;
case TYPE_BOOLEAN:
vector->data = malloc(sizeof(bool) * capacity);
break;
case TYPE_DATE:
vector->data = malloc(sizeof(date_t) * capacity);
break;
}
// Allocate NULL mask
vector->nullmask = malloc(sizeof(bool) * capacity);
memset(vector->nullmask, 0, sizeof(bool) * capacity);
return vector;
}
This columnar approach significantly improves performance on analytical queries compared to row-based processing, especially when operations only need to touch a subset of columns.
SIMD Optimizations for Filter Operations
For filtering operations on numeric data, I use SIMD instructions when available:
void comparison_equals_integers_simd(Vector* left, Vector* right, Vector* result, size_t count) {
int64_t* left_data = (int64_t*)left->data;
int64_t* right_data = (int64_t*)right->data;
bool* result_data = (bool*)result->data;
#ifdef __AVX2__
// Use AVX2 if available
size_t i = 0;
for (; i + 4 <= count; i += 4) {
__m256i left_vec = _mm256_loadu_si256((__m256i*)&left_data[i]);
__m256i right_vec = _mm256_loadu_si256((__m256i*)&right_data[i]);
__m256i cmp_result = _mm256_cmpeq_epi64(left_vec, right_vec);
// Extract the comparison results
result_data[i] = _mm256_extract_epi64(cmp_result, 0) != 0;
result_data[i+1] = _mm256_extract_epi64(cmp_result, 1) != 0;
result_data[i+2] = _mm256_extract_epi64(cmp_result, 2) != 0;
result_data[i+3] = _mm256_extract_epi64(cmp_result, 3) != 0;
}
// Process remaining elements
for (; i < count; i++) {
result_data[i] = left_data[i] == right_data[i];
}
#else
// Fallback to scalar code
for (size_t i = 0; i < count; i++) {
result_data[i] = left_data[i] == right_data[i];
}
#endif
// Handle nulls
for (size_t i = 0; i < count; i++) {
if (left->nullmask[i] || right->nullmask[i]) {
result->nullmask[i] = true;
result_data[i] = false;
} else {
result->nullmask[i] = false;
}
}
}
Parallel Query Execution
For multi-core systems, I implemented parallel execution strategies:
typedef struct {
Operator* original_op; // Original operator tree
Operator** parallel_ops; // Per-thread operator trees
size_t thread_count; // Number of threads
pthread_t* threads; // Thread handles
Queue** result_queues; // Per-thread result queues
bool* thread_finished; // Thread status flags
pthread_mutex_t mutex; // Synchronization mutex
size_t next_result_thread; // Next thread to get results from
} ParallelScanOperatorData;
void* parallel_scan_thread(void* arg) {
ThreadContext* ctx = (ThreadContext*)arg;
Operator* op = ctx->operator;
Queue* queue = ctx->result_queue;
DataChunk chunk;
initialize_data_chunk(&chunk);
while (op->vtable->get_next(op, &chunk)) {
// Clone the chunk for thread safety
DataChunk* chunk_copy = clone_data_chunk(&chunk);
// Add to result queue
queue_push(queue, chunk_copy);
initialize_data_chunk(&chunk);
// Check for termination request
if (stop_requested) {
break;
}
}
// Signal that this thread is done
pthread_mutex_lock(&ctx->parent->mutex);
ctx->parent->thread_finished[ctx->thread_id] = true;
pthread_mutex_unlock(&ctx->parent->mutex);
return NULL;
}
bool parallel_scan_get_next(Operator* op, DataChunk* output) {
ParallelScanOperatorData* data = (ParallelScanOperatorData*)op->operator_data;
// Initialize threads on first call
if (data->threads == NULL) {
initialize_parallel_scan(data);
}
while (true) {
pthread_mutex_lock(&data->mutex);
// Check if we have a result from the current thread
Queue* current_queue = data->result_queues[data->next_result_thread];
DataChunk* result = queue_pop(current_queue);
if (result != NULL) {
// We have a result, copy it to output
copy_data_chunk(result, output);
free_data_chunk(result);
free(result);
pthread_mutex_unlock(&data->mutex);
return true;
}
// No result from current thread, check if it's finished
if (data->thread_finished[data->next_result_thread]) {
// Try next thread
data->next_result_thread = (data->next_result_thread + 1) % data->thread_count;
// If we've checked all threads and they're all done, we're finished
if (all_threads_finished(data)) {
pthread_mutex_unlock(&data->mutex);
return false;
}
}
pthread_mutex_unlock(&data->mutex);
// No results available yet, wait a bit
usleep(1000); // 1ms sleep to avoid spinning
}
}
Production-Grade Considerations
Error Handling and Resilience
To ensure reliability in production environments, robust error handling is essential:
#define TRY_GOTO(expr, label) \
do { \
int __error = (expr); \
if (__error) { \
fprintf(stderr, "Error at %s:%d: %s\n", __FILE__, __LINE__, error_message(__error)); \
goto label; \
} \
} while (0)
int execute_query_with_error_handling(DatabaseInstance* db, const char* query) {
int result = 0;
QueryPlan* plan = NULL;
Operator* root_op = NULL;
// Parse and prepare query
TRY_GOTO(parse_query(query, &plan), cleanup);
TRY_GOTO(prepare_query(db, plan), cleanup);
TRY_GOTO(optimize_query(db, plan), cleanup);
TRY_GOTO(build_execution_tree(db, plan, &root_op), cleanup);
// Execute query
result = execute_query_tree(db, root_op);
cleanup:
if (root_op) close_operator_tree(root_op);
if (plan) free_query_plan(plan);
return result;
}
Resource Management
Careful management of system resources is critical:
void configure_resource_limits(ResourceLimits* limits) {
// Determine system resources
struct sysinfo info;
sysinfo(&info);
// Calculate memory limits
unsigned long total_mem = info.totalram * info.mem_unit;
unsigned long mem_limit = MIN(total_mem * 0.7, MAX_MEMORY_USAGE);
// Set limits
limits->memory_limit = mem_limit;
limits->max_threads = MIN(get_nprocs(), MAX_THREADS);
// Print configuration
fprintf(stderr, "Configured with:\n");
fprintf(stderr, " Memory limit: %lu MB\n", limits->memory_limit / (1024*1024));
fprintf(stderr, " Max threads: %d\n", limits->max_threads);
}
// Track memory usage
void* tracked_malloc(size_t size) {
pthread_mutex_lock(&memory_mutex);
if (current_memory_usage + size > memory_limit) {
pthread_mutex_unlock(&memory_mutex);
throw_out_of_memory_error();
return NULL;
}
void* ptr = malloc(size);
if (ptr) {
current_memory_usage += size;
// Store size for later free
size_t* size_ptr = malloc(sizeof(size_t));
*size_ptr = size;
hash_table_insert(allocation_table, ptr, size_ptr);
}
pthread_mutex_unlock(&memory_mutex);
return ptr;
}
void tracked_free(void* ptr) {
if (!ptr) return;
pthread_mutex_lock(&memory_mutex);
size_t* size_ptr = hash_table_lookup(allocation_table, ptr);
if (size_ptr) {
current_memory_usage -= *size_ptr;
hash_table_remove(allocation_table, ptr);
free(size_ptr);
}
free(ptr);
pthread_mutex_unlock(&memory_mutex);
}
Conclusion
Building a command-line SQL engine for Linux data pipelines has been a challenging but rewarding project. The architecture described here provides a powerful tool for data analysis directly within the shell environment, enabling sophisticated data manipulation without the overhead of a traditional database.
The key insights from this implementation are:
Columnar data representation dramatically improves performance for analytical workloads
Vectorized execution provides efficient CPU utilization
Stream processing enables handling datasets larger than available memory
Unix integration makes the tool feel like a natural part of the shell environment
With these techniques, the engine can process gigabytes of data efficiently even on modest hardware, making it an invaluable addition to any Linux system engineer's toolkit.
I will continue to explore and expand on this tool and share my findings in future articles.
Subscribe to my newsletter
Read articles from Dove-Wing directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
