Streaming Support
hypequery provides streaming support for efficient processing of large datasets. This feature is particularly useful when dealing with large result sets that would be too memory-intensive to load entirely into memory at once.
Note: hypequery also provides comprehensive logging for streaming operations. For details on logging capabilities, see the Logging documentation.
Basic Usage
The streaming API uses the Web Streams API standard, providing a modern and efficient way to process data streams. Here’s how to use it:
const builder = createQueryBuilder(schema).table('users');
// Get a ReadableStream of results - note that stream() is async
const stream = await builder
.select(['id', 'name', 'email'])
.where('age', 'gt', 18)
.stream();
// Process the stream
const reader = stream.getReader();
try {
while (true) {
const { done, value: rows } = await reader.read();
if (done) break;
// Process each batch of rows
rows.forEach(row => {
console.log(row);
});
}
console.log('Completed!');
} catch (error) {
console.error('Error processing stream:', error);
} finally {
reader.releaseLock();
}
Key Concepts
- Async Stream Method: The
stream()
method returns a Promise that resolves to a ReadableStream - Reader: The
getReader()
method returns a reader that allows you to consume the stream - Reading Data: The
read()
method returns a promise with:done
: boolean indicating if the stream is finishedvalue
: array of rows from the current batch
- Resource Management: Always use
try/finally
and callreader.releaseLock()
when done - Error Handling: Use
try/catch
to handle any errors during stream processing
Alternative: Event-Based Approach
If you prefer an event-based approach similar to Node.js streams, you can create a wrapper function:
import { EventEmitter } from 'events';
async function streamToEvents<T>(streamPromise: Promise<ReadableStream<T[]>>) {
const stream = await streamPromise;
const reader = stream.getReader();
const emitter = new EventEmitter();
(async () => {
try {
while (true) {
const { done, value: rows } = await reader.read();
if (done) {
emitter.emit('end');
break;
}
emitter.emit('data', rows);
}
} catch (error) {
emitter.emit('error', error);
} finally {
reader.releaseLock();
}
})();
return emitter;
}
// Usage:
const streamPromise = builder.stream();
const emitter = await streamToEvents(streamPromise);
emitter.on('data', (rows: T[]) => {
rows.forEach(row => {
console.log(row);
});
});
emitter.on('end', () => {
console.log('Completed!');
});
emitter.on('error', (error) => {
console.error('Error:', error);
});
Note: While the event-based approach is available, we recommend using the native Web Streams API as it provides better control over the streaming process and is more efficient.
Helper Functions
For convenience, you might want to create helper functions to work with streams:
// Process each row in a stream
async function streamForEach<T>(
streamPromise: Promise<ReadableStream<T[]>>,
callback: (row: T) => void | Promise<void>
): Promise<void> {
const stream = await streamPromise;
const reader = stream.getReader();
try {
while (true) {
const { done, value: rows } = await reader.read();
if (done) break;
for (const row of rows) {
await callback(row);
}
}
} finally {
reader.releaseLock();
}
}
// Usage:
await streamForEach(
builder.table('users').select(['id', 'name']).stream(),
user => console.log(user.name)
);
Benefits of Streaming
- Memory Efficiency: Process large datasets without loading everything into memory at once
- Real-time Processing: Start processing data as soon as it’s available
- Better Resource Management: Release resources as soon as they’re no longer needed
Example Use Cases
Processing Large Datasets
const stream = builder
.select(['id', 'name', 'created_at'])
.where('created_at', 'gt', new Date('2024-01-01'))
.stream();
let totalUsers = 0;
const reader = stream.getReader();
try {
while (true) {
const { done, value: rows } = await reader.read();
if (done) break;
totalUsers += rows.length;
// Process each batch of users
rows.forEach(user => {
// Do something with each user
});
}
console.log(`Total users processed: ${totalUsers}`);
} finally {
reader.releaseLock();
}
Real-time Data Processing
const stream = builder
.select(['timestamp', 'event_type', 'data'])
.where('timestamp', 'gt', new Date(Date.now() - 3600000)) // Last hour
.stream();
const reader = stream.getReader();
try {
while (true) {
const { done, value: events } = await reader.read();
if (done) break;
// Process events in real-time
events.forEach(event => {
switch (event.event_type) {
case 'user_login':
handleUserLogin(event);
break;
case 'purchase':
handlePurchase(event);
break;
// ... handle other event types
}
});
}
} finally {
reader.releaseLock();
}
Best Practices
- Always Process Streams: Make sure to fully consume the stream to avoid memory leaks
- Error Handling: Implement proper error handling for stream processing
- Resource Cleanup: Always release the reader lock when done processing
- Batch Processing: Process rows in batches as they arrive rather than accumulating them
Limitations
- Streaming is only available in environments that support the Web Streams API
- The stream must be consumed in order (no random access)
- Each chunk of data must be processed before moving to the next
- The reader lock must be released before another reader can be obtained
See Also
Advanced Connection Options
HypeQuery supports additional connection options provided by the underlying ClickHouse client. These options can be used to customize your connection:
HTTP Headers
You can add custom HTTP headers to your ClickHouse requests, which is useful for authentication with reverse proxies or passing custom information:
const builder = createQueryBuilder({
host: 'http://localhost:8123',
username: 'default',
password: 'password',
database: 'default',
http_headers: {
'X-Custom-Header': 'custom-value',
'Authorization': 'Bearer your-token'
}
});
Request Timeout
You can customize the request timeout (in milliseconds) for your ClickHouse queries:
const builder = createQueryBuilder({
host: 'http://localhost:8123',
username: 'default',
password: 'password',
database: 'default',
request_timeout: 60000 // 60 seconds
});
Compression
Enable compression for requests and responses to improve performance with large datasets:
const builder = createQueryBuilder({
host: 'http://localhost:8123',
username: 'default',
password: 'password',
database: 'default',
compression: {
request: true, // Compress request bodies
response: true // Decompress response bodies
}
});
Application Name
Set a custom application name to identify your application in ClickHouse logs:
const builder = createQueryBuilder({
host: 'http://localhost:8123',
username: 'default',
password: 'password',
database: 'default',
application: 'MyAnalyticsApp'
});
Keep Alive
Configure the keep-alive behavior for the connection:
const builder = createQueryBuilder({
host: 'http://localhost:8123',
username: 'default',
password: 'password',
database: 'default',
keep_alive: {
enabled: true
}
});
Custom Logging
Configure custom logging for the ClickHouse client:
const builder = createQueryBuilder({
host: 'http://localhost:8123',
username: 'default',
password: 'password',
database: 'default',
log: {
level: 'debug' // 'trace', 'debug', 'info', 'warn', 'error'
}
});
ClickHouse Settings
Pass custom ClickHouse settings to control query execution behavior:
const builder = createQueryBuilder({
host: 'http://localhost:8123',
username: 'default',
password: 'password',
database: 'default',
clickhouse_settings: {
max_execution_time: 30,
max_block_size: 10000,
async_insert: 1,
wait_for_async_insert: 1
}
});