Streaming

hypequery provides streaming support for efficient processing of large datasets. This feature is particularly useful when dealing with large result sets that would be too memory-intensive to load entirely into memory at once.

Basic Usage

The streaming API uses the Web Streams API standard, providing a modern and efficient way to process data streams:

import { createQueryBuilder } from '@hypequery/clickhouse';

const db = createQueryBuilder<Schema>();

// Get a ReadableStream of results
const stream = await db
  .table('users')
  .select(['id', 'name', 'email'])
  .where('age', 'gt', 18)
  .stream();

// Process the stream
const reader = stream.getReader();

try {
  while (true) {
    const { done, value: rows } = await reader.read();
    if (done) break;
    
    // Process each batch of rows
    rows.forEach(row => {
      console.log(row);
    });
  }
} finally {
  reader.releaseLock();
}

Type Definitions

stream(): Promise<ReadableStream<T[]>>
streamForEach<R = void>(callback: (row: T) => R | Promise<R>): Promise<void>

Returns

  • stream(): Returns a Promise that resolves to a ReadableStream of row arrays
  • streamForEach(): Processes each row with a callback function and returns a Promise

Examples

Basic Streaming

// Stream users with filtering
const stream = await db
  .table('users')
  .select(['id', 'name', 'email'])
  .where('status', 'eq', 'active')
  .stream();

const reader = stream.getReader();
const results: any[] = [];

try {
  while (true) {
    const { done, value: rows } = await reader.read();
    if (done) break;
    results.push(...rows);
  }
} finally {
  reader.releaseLock();
}

Using streamForEach Helper

// Process each row individually using the built-in helper
await db
  .table('users')
  .select(['id', 'name'])
  .streamForEach(async (user) => {
    console.log(`Processing user: ${user.name}`);
    // Process each user
  });

Streaming with Joins

// Stream results with joins
const stream = await db
  .table('orders')
  .leftJoin('users', 'orders.user_id', 'users.id')
  .select(['orders.id', 'orders.total', 'users.name'])
  .stream();

const reader = stream.getReader();

try {
  while (true) {
    const { done, value: rows } = await reader.read();
    if (done) break;
    
    rows.forEach(order => {
      console.log(`Order ${order.id}: $${order.total} by ${order.name}`);
    });
  }
} finally {
  reader.releaseLock();
}

Key Concepts

Async Stream Method

The stream() method returns a Promise that resolves to a ReadableStream.

Reader Pattern

  • Use getReader() to get a reader for consuming the stream
  • Call read() to get the next batch of data
  • Always call releaseLock() when done

Data Structure

Each read() call returns:

  • done: boolean indicating if the stream is finished
  • value: array of rows from the current batch