QuestDB Node.js Client
Requirements
The client requires Node.js v16 or newer version.
Installation
npm i -s @questdb/nodejs-client
Examples
Basic API usage
const { Sender } = require('@questdb/nodejs-client');
async function run() {
const sender = new Sender();
// connect to QuestDB
// host and port are required in connect options
await sender.connect({port: 9009, host: 'localhost'});
// add rows to the buffer of the sender
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0195).floatColumn('ask', 1.0221)
.at(Date.now(), 'ms');
sender.table('prices').symbol('instrument', 'GBPUSD')
.floatColumn('bid', 1.2076).floatColumn('ask', 1.2082)
.at(Date.now(), 'ms');
// flush the buffer of the sender, sending the data to QuestDB
// the buffer is cleared after the data is sent and the sender is ready to accept new data
await sender.flush();
// add rows to the buffer again and send it to the server
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0197).floatColumn('ask', 1.0224)
.at(Date.now(), 'ms');
await sender.flush();
// close the connection after all rows ingested
await sender.close();
return new Promise(resolve => resolve(0));
}
run()
.then(console.log)
.catch(console.error);
Authentication and secure connection
const { Sender } = require('@questdb/nodejs-client');
async function run() {
// authentication details
const CLIENT_ID = 'testapp';
const PRIVATE_KEY = '9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8';
const AUTH = {
keyId: CLIENT_ID,
token: PRIVATE_KEY
};
// pass the authentication details to the sender
const sender = new Sender({auth: AUTH});
// connect() takes an optional second argument
// if 'true' passed the connection is secured with TLS encryption
await sender.connect({port: 9009, host: 'localhost'}, true);
// send the data over the authenticated and secure connection
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0197).floatColumn('ask', 1.0224)
.at(Date.now(), 'ms');
await sender.flush();
// close the connection after all rows ingested
await sender.close();
}
run().catch(console.error);
TypeScript example
import { Sender } from '@questdb/nodejs-client';
async function run(): Promise<number> {
// authentication details
const CLIENT_ID: string = 'testapp';
const PRIVATE_KEY: string = '9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8';
const AUTH: { kid: string, d: string } = {
keyId: CLIENT_ID,
token: PRIVATE_KEY
};
// pass the authentication details to the sender
const sender: Sender = new Sender({auth: AUTH});
// connect() takes an optional second argument
// if 'true' passed the connection is secured with TLS encryption
await sender.connect({port: 9009, host: 'localhost'}, true);
// send the data over the authenticated and secure connection
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0197).floatColumn('ask', 1.0224).at(Date.now(), 'ms');
await sender.flush();
// close the connection after all rows ingested
await sender.close();
}
run().catch(console.error);
Worker threads example
const { Sender } = require('@questdb/nodejs-client');
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
// fake venue
// generates random prices for a ticker for max 5 seconds, then the feed closes
function* venue(ticker) {
let end = false;
setTimeout(() => { end = true; }, rndInt(5000));
while (!end) {
yield {'ticker': ticker, 'price': Math.random()};
}
}
// market data feed simulator
// uses the fake venue to deliver price updates to the feed handler (onTick() callback)
async function subscribe(ticker, onTick) {
const feed = venue(workerData.ticker);
let tick;
while (tick = feed.next().value) {
await onTick(tick);
await sleep(rndInt(30));
}
}
async function run() {
if (isMainThread) {
const tickers = ['t1', 't2', 't3', 't4'];
// main thread to start a worker thread for each ticker
for (let ticker in tickers) {
const worker = new Worker(__filename, { workerData: { ticker: ticker } })
.on('error', (err) => { throw err; })
.on('exit', () => { console.log(`${ticker} thread exiting...`); })
.on('message', (msg) => {
console.log(`Ingested ${msg.count} prices for ticker ${msg.ticker}`);
});
}
} else {
// it is important that each worker has a dedicated sender object
// threads cannot share the sender because they would write into the same buffer
const sender = new Sender();
await sender.connect({ port: 9009, host: 'localhost' });
// subscribe for the market data of the ticker assigned to the worker
// ingest each price update into the database using the sender
let count = 0;
await subscribe(workerData.ticker, async (tick) => {
sender
.table('prices')
.symbol('ticker', tick.ticker)
.floatColumn('price', tick.price)
.at(Date.now(), 'ms');
await sender.flush();
count++;
});
// let the main thread know how many prices were ingested
parentPort.postMessage({'ticker': workerData.ticker, 'count': count});
// close the connection to the database
await sender.close();
}
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
function rndInt(limit) {
return Math.floor((Math.random() * limit) + 1);
}
run()
.then(console.log)
.catch(console.error);