Streaming Support

hypequery provides streaming support for efficient processing of large datasets. This feature is particularly useful when dealing with large result sets that would be too memory-intensive to load entirely into memory at once.

Note: hypequery also provides comprehensive logging for streaming operations. For details on logging capabilities, see the Logging documentation.

Basic Usage

The streaming API uses the Web Streams API standard, providing a modern and efficient way to process data streams. Here’s how to use it:

const builder = createQueryBuilder(schema).table('users');

// Get a ReadableStream of results - note that stream() is async
const stream = await builder
  .select(['id', 'name', 'email'])
  .where('age', 'gt', 18)
  .stream();

// Process the stream
const reader = stream.getReader();

try {
  while (true) {
    const { done, value: rows } = await reader.read();
    if (done) break;
    
    // Process each batch of rows
    rows.forEach(row => {
      console.log(row);
    });
  }
  console.log('Completed!');
} catch (error) {
  console.error('Error processing stream:', error);
} finally {
  reader.releaseLock();
}

Key Concepts

  1. Async Stream Method: The stream() method returns a Promise that resolves to a ReadableStream
  2. Reader: The getReader() method returns a reader that allows you to consume the stream
  3. Reading Data: The read() method returns a promise with:
    • done: boolean indicating if the stream is finished
    • value: array of rows from the current batch
  4. Resource Management: Always use try/finally and call reader.releaseLock() when done
  5. Error Handling: Use try/catch to handle any errors during stream processing

Alternative: Event-Based Approach

If you prefer an event-based approach similar to Node.js streams, you can create a wrapper function:

import { EventEmitter } from 'events';

async function streamToEvents<T>(streamPromise: Promise<ReadableStream<T[]>>) {
  const stream = await streamPromise;
  const reader = stream.getReader();
  const emitter = new EventEmitter();

  (async () => {
    try {
      while (true) {
        const { done, value: rows } = await reader.read();
        if (done) {
          emitter.emit('end');
          break;
        }
        emitter.emit('data', rows);
      }
    } catch (error) {
      emitter.emit('error', error);
    } finally {
      reader.releaseLock();
    }
  })();

  return emitter;
}

// Usage:
const streamPromise = builder.stream();
const emitter = await streamToEvents(streamPromise);

emitter.on('data', (rows: T[]) => {
  rows.forEach(row => {
    console.log(row);
  });
});

emitter.on('end', () => {
  console.log('Completed!');
});

emitter.on('error', (error) => {
  console.error('Error:', error);
});

Note: While the event-based approach is available, we recommend using the native Web Streams API as it provides better control over the streaming process and is more efficient.

Helper Functions

For convenience, you might want to create helper functions to work with streams:

// Process each row in a stream
async function streamForEach<T>(
  streamPromise: Promise<ReadableStream<T[]>>,
  callback: (row: T) => void | Promise<void>
): Promise<void> {
  const stream = await streamPromise;
  const reader = stream.getReader();
  
  try {
    while (true) {
      const { done, value: rows } = await reader.read();
      if (done) break;
      
      for (const row of rows) {
        await callback(row);
      }
    }
  } finally {
    reader.releaseLock();
  }
}

// Usage:
await streamForEach(
  builder.table('users').select(['id', 'name']).stream(),
  user => console.log(user.name)
);

Benefits of Streaming

  1. Memory Efficiency: Process large datasets without loading everything into memory at once
  2. Real-time Processing: Start processing data as soon as it’s available
  3. Better Resource Management: Release resources as soon as they’re no longer needed

Example Use Cases

Processing Large Datasets

const stream = builder
  .select(['id', 'name', 'created_at'])
  .where('created_at', 'gt', new Date('2024-01-01'))
  .stream();

let totalUsers = 0;
const reader = stream.getReader();

try {
  while (true) {
    const { done, value: rows } = await reader.read();
    if (done) break;
    
    totalUsers += rows.length;
    // Process each batch of users
    rows.forEach(user => {
      // Do something with each user
    });
  }
  console.log(`Total users processed: ${totalUsers}`);
} finally {
  reader.releaseLock();
}

Real-time Data Processing

const stream = builder
  .select(['timestamp', 'event_type', 'data'])
  .where('timestamp', 'gt', new Date(Date.now() - 3600000)) // Last hour
  .stream();

const reader = stream.getReader();

try {
  while (true) {
    const { done, value: events } = await reader.read();
    if (done) break;
    
    // Process events in real-time
    events.forEach(event => {
      switch (event.event_type) {
        case 'user_login':
          handleUserLogin(event);
          break;
        case 'purchase':
          handlePurchase(event);
          break;
        // ... handle other event types
      }
    });
  }
} finally {
  reader.releaseLock();
}

Best Practices

  1. Always Process Streams: Make sure to fully consume the stream to avoid memory leaks
  2. Error Handling: Implement proper error handling for stream processing
  3. Resource Cleanup: Always release the reader lock when done processing
  4. Batch Processing: Process rows in batches as they arrive rather than accumulating them

Limitations

  1. Streaming is only available in environments that support the Web Streams API
  2. The stream must be consumed in order (no random access)
  3. Each chunk of data must be processed before moving to the next
  4. The reader lock must be released before another reader can be obtained

See Also

Advanced Connection Options

HypeQuery supports additional connection options provided by the underlying ClickHouse client. These options can be used to customize your connection:

HTTP Headers

You can add custom HTTP headers to your ClickHouse requests, which is useful for authentication with reverse proxies or passing custom information:

const builder = createQueryBuilder({
  host: 'http://localhost:8123',
  username: 'default',
  password: 'password',
  database: 'default',
  http_headers: {
    'X-Custom-Header': 'custom-value',
    'Authorization': 'Bearer your-token'
  }
});

Request Timeout

You can customize the request timeout (in milliseconds) for your ClickHouse queries:

const builder = createQueryBuilder({
  host: 'http://localhost:8123',
  username: 'default',
  password: 'password',
  database: 'default',
  request_timeout: 60000 // 60 seconds
});

Compression

Enable compression for requests and responses to improve performance with large datasets:

const builder = createQueryBuilder({
  host: 'http://localhost:8123',
  username: 'default',
  password: 'password',
  database: 'default',
  compression: {
    request: true,   // Compress request bodies
    response: true   // Decompress response bodies
  }
});

Application Name

Set a custom application name to identify your application in ClickHouse logs:

const builder = createQueryBuilder({
  host: 'http://localhost:8123',
  username: 'default',
  password: 'password',
  database: 'default',
  application: 'MyAnalyticsApp'
});

Keep Alive

Configure the keep-alive behavior for the connection:

const builder = createQueryBuilder({
  host: 'http://localhost:8123',
  username: 'default',
  password: 'password',
  database: 'default',
  keep_alive: {
    enabled: true
  }
});

Custom Logging

Configure custom logging for the ClickHouse client:

const builder = createQueryBuilder({
  host: 'http://localhost:8123',
  username: 'default',
  password: 'password',
  database: 'default',
  log: {
    level: 'debug' // 'trace', 'debug', 'info', 'warn', 'error'
  }
});

ClickHouse Settings

Pass custom ClickHouse settings to control query execution behavior:

const builder = createQueryBuilder({
  host: 'http://localhost:8123',
  username: 'default',
  password: 'password',
  database: 'default',
  clickhouse_settings: {
    max_execution_time: 30,
    max_block_size: 10000,
    async_insert: 1,
    wait_for_async_insert: 1
  }
});