Streaming
hypequery provides streaming support for efficient processing of large datasets. This feature is particularly useful when dealing with large result sets that would be too memory-intensive to load entirely into memory at once.
Basic Usage
The streaming API uses the Web Streams API standard, providing a modern and efficient way to process data streams:
import { createQueryBuilder } from '@hypequery/clickhouse';
const db = createQueryBuilder<Schema>();
// Get a ReadableStream of results
const stream = await db
.table('users')
.select(['id', 'name', 'email'])
.where('age', 'gt', 18)
.stream();
// Process the stream
const reader = stream.getReader();
try {
while (true) {
const { done, value: rows } = await reader.read();
if (done) break;
// Process each batch of rows
rows.forEach(row => {
console.log(row);
});
}
} finally {
reader.releaseLock();
}
Type Definitions
stream(): Promise<ReadableStream<T[]>>
streamForEach<R = void>(callback: (row: T) => R | Promise<R>): Promise<void>
Returns
stream()
: Returns a Promise that resolves to a ReadableStream of row arraysstreamForEach()
: Processes each row with a callback function and returns a Promise
Examples
Basic Streaming
// Stream users with filtering
const stream = await db
.table('users')
.select(['id', 'name', 'email'])
.where('status', 'eq', 'active')
.stream();
const reader = stream.getReader();
const results: any[] = [];
try {
while (true) {
const { done, value: rows } = await reader.read();
if (done) break;
results.push(...rows);
}
} finally {
reader.releaseLock();
}
Using streamForEach Helper
// Process each row individually using the built-in helper
await db
.table('users')
.select(['id', 'name'])
.streamForEach(async (user) => {
console.log(`Processing user: ${user.name}`);
// Process each user
});
Streaming with Joins
// Stream results with joins
const stream = await db
.table('orders')
.leftJoin('users', 'orders.user_id', 'users.id')
.select(['orders.id', 'orders.total', 'users.name'])
.stream();
const reader = stream.getReader();
try {
while (true) {
const { done, value: rows } = await reader.read();
if (done) break;
rows.forEach(order => {
console.log(`Order ${order.id}: $${order.total} by ${order.name}`);
});
}
} finally {
reader.releaseLock();
}
Key Concepts
Async Stream Method
The stream()
method returns a Promise that resolves to a ReadableStream.
Reader Pattern
- Use
getReader()
to get a reader for consuming the stream - Call
read()
to get the next batch of data - Always call
releaseLock()
when done
Data Structure
Each read()
call returns:
done
: boolean indicating if the stream is finishedvalue
: array of rows from the current batch