'use strict';
/* eslint-disable no-unused-vars */
const { readFileSync } = require("fs");
const { Buffer } = require('buffer');
const { log } = require('./logging');
const { validateTableName, validateColumnName } = require('./validation');
const { SenderOptions, HTTP, HTTPS, TCP, TCPS } = require('./options');
const http = require('http');
const https = require('https');
const net = require('net');
const tls = require('tls');
const crypto = require('crypto');
const HTTP_NO_CONTENT = 204; // success
const DEFAULT_HTTP_AUTO_FLUSH_ROWS = 75000;
const DEFAULT_TCP_AUTO_FLUSH_ROWS = 600;
const DEFAULT_AUTO_FLUSH_INTERVAL = 1000; // 1 sec
const DEFAULT_MAX_NAME_LENGTH = 127;
const DEFAULT_REQUEST_MIN_THROUGHPUT = 102400; // 100 KB/sec
const DEFAULT_REQUEST_TIMEOUT = 10000; // 10 sec
const DEFAULT_RETRY_TIMEOUT = 10000; // 10 sec
const DEFAULT_BUFFER_SIZE = 65536; // 64 KB
const DEFAULT_MAX_BUFFER_SIZE = 104857600; // 100 MB
// default options for HTTP agent
// - persistent connections with 1 minute idle timeout, server side has 5 minutes set by default
// - max open connections is set to 256, same as server side default
const DEFAULT_HTTP_AGENT_CONFIG = {
maxSockets: 256,
keepAlive: true,
timeout: 60000 // 1 min
}
// an arbitrary public key, not used in authentication
// only used to construct a valid JWK token which is accepted by the crypto API
const PUBLIC_KEY = {
x: 'aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc',
y: '__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg'
};
/** @classdesc
* The QuestDB client's API provides methods to connect to the database, ingest data, and close the connection.
* The supported protocols are HTTP and TCP. HTTP is preferred as it provides feedback in the HTTP response. <br>
* Based on benchmarks HTTP also provides higher throughput, if configured to ingest data in bigger batches.
* <p>
* The client supports authentication. <br>
* Authentication details can be passed to the Sender in its configuration options. <br>
* The client supports Basic username/password and Bearer token authentication methods when used with HTTP protocol,
* and JWK token authentication when ingesting data via TCP. <br>
* Please, note that authentication is enabled by default in QuestDB Enterprise only. <br>
* Details on how to configure authentication in the open source version of
* QuestDB: {@link https://questdb.io/docs/reference/api/ilp/authenticate}
* </p>
* <p>
* The client also supports TLS encryption for both, HTTP and TCP transports to provide a secure connection. <br>
* Please, note that the open source version of QuestDB does not support TLS, and requires an external reverse-proxy,
* such as Nginx to enable encryption.
* </p>
* <p>
* The client uses a buffer to store data. It automatically flushes the buffer by sending its content to the server.
* Auto flushing can be disabled via configuration options to gain control over transactions. Initial and maximum
* buffer sizes can also be set.
* </p>
* <p>
* It is recommended that the Sender is created by using one of the static factory methods,
* <i>Sender.fromConfig(configString, extraOptions)</i> or <i>Sender.fromEnv(extraOptions)</i>).
* If the Sender is created via its constructor, at least the SenderOptions configuration object should be
* initialized from a configuration string to make sure that the parameters are validated. <br>
* Detailed description of the Sender's configuration options can be found in
* the <a href="SenderOptions.html">SenderOptions</a> documentation.
* </p>
* <p>
* Extra options can be provided to the Sender in the <i>extraOptions</i> configuration object. <br>
* A custom logging function and a custom HTTP(S) agent can be passed to the Sender in this object. <br>
* The logger implementation provides the option to direct log messages to the same place where the host application's
* log is saved. The default logger writes to the console. <br>
* The custom HTTP(S) agent option becomes handy if there is a need to modify the default options set for the
* HTTP(S) connections. A popular setting would be disabling persistent connections, in this case an agent can be
* passed to the Sender with <i>keepAlive</i> set to <i>false</i>. <br>
* For example: <i>Sender.fromConfig(`http::addr=host:port`, { agent: new http.Agent({ keepAlive: false })})</i> <br>
* If no custom agent is configured, the Sender will use its own agent which overrides some default values
* of <i>http.Agent</i>/<i>https.Agent</i>. The Sender's own agent uses persistent connections with 1 minute idle
* timeout, and limits the number of open connections to the server, which is set to 256 for each host.
* </p>
*/
class Sender {
/** @private */ static DEFAULT_HTTP_AGENT;
/** @private */ static DEFAULT_HTTPS_AGENT;
/** @private */ http; // true if the protocol is HTTP/HTTPS, false if it is TCP/TCPS
/** @private */ secure; // true if the protocol is HTTPS or TCPS, false otherwise
/** @private */ host;
/** @private */ port;
/** @private */ socket;
/** @private */ username;
/** @private */ password;
/** @private */ token;
/** @private */ tlsVerify;
/** @private */ tlsCA;
/** @private */ bufferSize;
/** @private */ maxBufferSize;
/** @private */ buffer;
/** @private */ toBuffer;
/** @private */ doResolve;
/** @private */ position;
/** @private */ endOfLastRow;
/** @private */ autoFlush;
/** @private */ autoFlushRows;
/** @private */ autoFlushInterval;
/** @private */ lastFlushTime;
/** @private */ pendingRowCount;
/** @private */ requestMinThroughput;
/** @private */ requestTimeout;
/** @private */ retryTimeout;
/** @private */ hasTable;
/** @private */ hasSymbols;
/** @private */ hasColumns;
/** @private */ maxNameLength;
/** @private */ log;
/** @private */ agent;
/**
* Creates an instance of Sender.
*
* @param {SenderOptions} options - Sender configuration object. <br>
* See SenderOptions documentation for detailed description of configuration options. <br>
*/
constructor(options) {
if (!options || !options.protocol) {
throw new Error('The \'protocol\' option is mandatory');
}
replaceDeprecatedOptions(options);
this.log = typeof options.log === 'function' ? options.log : log;
switch (options.protocol) {
case HTTP:
this.http = true;
this.secure = false;
this.agent = options.agent instanceof http.Agent ? options.agent : this.getDefaultHttpAgent();
break;
case HTTPS:
this.http = true;
this.secure = true;
this.agent = options.agent instanceof https.Agent ? options.agent : this.getDefaultHttpsAgent();
break;
case TCP:
this.http = false;
this.secure = false;
break;
case TCPS:
this.http = false;
this.secure = true;
break;
default:
throw new Error(`Invalid protocol: '${options.protocol}'`);
}
if (this.http) {
this.username = options.username;
this.password = options.password;
this.token = options.token;
if (!options.port) {
options.port = 9000;
}
} else {
if (!options.auth && !options.jwk) {
constructAuth(options);
}
this.jwk = constructJwk(options);
if (!options.port) {
options.port = 9009;
}
}
this.host = options.host;
this.port = options.port;
this.tlsVerify = isBoolean(options.tls_verify) ? options.tls_verify : true;
this.tlsCA = options.tls_ca ? readFileSync(options.tls_ca) : undefined;
this.autoFlush = isBoolean(options.auto_flush) ? options.auto_flush : true;
this.autoFlushRows = isInteger(options.auto_flush_rows, 0) ? options.auto_flush_rows : (this.http ? DEFAULT_HTTP_AUTO_FLUSH_ROWS : DEFAULT_TCP_AUTO_FLUSH_ROWS);
this.autoFlushInterval = isInteger(options.auto_flush_interval, 0) ? options.auto_flush_interval : DEFAULT_AUTO_FLUSH_INTERVAL;
this.maxNameLength = isInteger(options.max_name_len, 1) ? options.max_name_len : DEFAULT_MAX_NAME_LENGTH;
this.requestMinThroughput = isInteger(options.request_min_throughput, 1) ? options.request_min_throughput : DEFAULT_REQUEST_MIN_THROUGHPUT;
this.requestTimeout = isInteger(options.request_timeout, 1) ? options.request_timeout : DEFAULT_REQUEST_TIMEOUT;
this.retryTimeout = isInteger(options.retry_timeout, 0) ? options.retry_timeout : DEFAULT_RETRY_TIMEOUT;
const noCopy = isBoolean(options.copy_buffer) && !options.copy_buffer;
this.toBuffer = noCopy ? this.toBufferView : this.toBufferNew;
this.doResolve = noCopy
? (resolve) => {
compact(this);
resolve(true);
}
: (resolve) => {
resolve(true);
}
this.maxBufferSize = isInteger(options.max_buf_size, 1) ? options.max_buf_size : DEFAULT_MAX_BUFFER_SIZE;
this.resize(isInteger(options.init_buf_size, 1) ? options.init_buf_size : DEFAULT_BUFFER_SIZE);
this.reset();
}
/**
* Creates a Sender options object by parsing the provided configuration string.
*
* @param {string} configurationString - Configuration string. <br>
* @param {object} extraOptions - Optional extra configuration. <br>
* - 'log' is a logging function used by the <a href="Sender.html">Sender</a>. <br>
* Prototype: <i>(level: 'error'|'warn'|'info'|'debug', message: string) => void</i>. <br>
* - 'agent' is a custom http/https agent used by the <a href="Sender.html">Sender</a> when http/https transport is used. <br>
* A <i>http.Agent</i> or <i>https.Agent</i> object is expected.
*
* @return {Sender} A Sender object initialized from the provided configuration string.
*/
static fromConfig(configurationString, extraOptions = undefined) {
return new Sender(SenderOptions.fromConfig(configurationString, extraOptions));
}
/**
* Creates a Sender options object by parsing the configuration string set in the <b>QDB_CLIENT_CONF</b> environment variable.
*
* @param {object} extraOptions - Optional extra configuration. <br>
* - 'log' is a logging function used by the <a href="Sender.html">Sender</a>. <br>
* Prototype: <i>(level: 'error'|'warn'|'info'|'debug', message: string) => void</i>. <br>
* - 'agent' is a custom http/https agent used by the <a href="Sender.html">Sender</a> when http/https transport is used. <br>
* A <i>http.Agent</i> or <i>https.Agent</i> object is expected.
*
* @return {Sender} A Sender object initialized from the <b>QDB_CLIENT_CONF</b> environment variable.
*/
static fromEnv(extraOptions = undefined) {
return new Sender(SenderOptions.fromConfig(process.env.QDB_CLIENT_CONF, extraOptions));
}
/**
* Extends the size of the sender's buffer. <br>
* Can be used to increase the size of buffer if overflown.
* The buffer's content is copied into the new buffer.
*
* @param {number} bufferSize - New size of the buffer used by the sender, provided in bytes.
*/
resize(bufferSize) {
if (bufferSize > this.maxBufferSize) {
throw new Error(`Max buffer size is ${this.maxBufferSize} bytes, requested buffer size: ${bufferSize}`);
}
this.bufferSize = bufferSize;
// Allocating an extra byte because Buffer.write() does not fail if the length of the data to be written is
// longer than the size of the buffer. It simply just writes whatever it can, and returns.
// If we can write into the extra byte, that indicates buffer overflow.
// See the check in our write() function.
const newBuffer = Buffer.alloc(this.bufferSize + 1, 0, 'utf8');
if (this.buffer) {
this.buffer.copy(newBuffer);
}
this.buffer = newBuffer;
}
/**
* Resets the buffer, data added to the buffer will be lost. <br>
* In other words it clears the buffer and sets the writing position to the beginning of the buffer.
*
* @return {Sender} Returns with a reference to this sender.
*/
reset() {
this.position = 0;
this.lastFlushTime = Date.now();
this.pendingRowCount = 0;
startNewRow(this);
return this;
}
/**
* Creates a TCP connection to the database.
*
* @param {net.NetConnectOpts | tls.ConnectionOptions} connectOptions - Connection options, host and port are required.
*
* @return {Promise<boolean>} Resolves to true if the client is connected.
*/
connect(connectOptions = undefined) {
if (this.http) {
throw new Error('\'connect()\' should be called only if the sender connects via TCP');
}
if (!connectOptions) {
connectOptions = {
host: this.host,
port: this.port,
ca: this.tlsCA
}
}
if (!connectOptions.host) {
throw new Error('Hostname is not set');
}
if (!connectOptions.port) {
throw new Error('Port is not set');
}
let self = this;
return new Promise((resolve, reject) => {
if (this.socket) {
throw new Error('Sender connected already');
}
let authenticated = false;
let data;
this.socket = !this.secure
? net.connect(connectOptions)
: tls.connect(connectOptions, () => {
if (authenticated) {
resolve(true);
}
});
this.socket.setKeepAlive(true);
this.socket.on('data', async raw => {
data = !data ? raw : Buffer.concat([data, raw]);
if (!authenticated) {
authenticated = await authenticate(self, data);
if (authenticated) {
resolve(true);
}
} else {
this.log('warn', `Received unexpected data: ${data}`);
}
})
.on('ready', async () => {
this.log('info', `Successfully connected to ${connectOptions.host}:${connectOptions.port}`);
if (self.jwk) {
this.log('info', `Authenticating with ${connectOptions.host}:${connectOptions.port}`);
await self.socket.write(`${self.jwk.kid}\n`, err => {
if (err) {
reject(err);
}
});
} else {
authenticated = true;
if (!self.secure || !self.tlsVerify) {
resolve(true);
}
}
})
.on('error', err => {
self.log('error', err);
if (err.code !== 'SELF_SIGNED_CERT_IN_CHAIN' || self.tlsVerify) {
reject(err);
}
});
});
}
/**
* @ignore
* @return {http.Agent} Returns the default http agent.
*/
getDefaultHttpAgent() {
if (!Sender.DEFAULT_HTTP_AGENT) {
Sender.DEFAULT_HTTP_AGENT = new http.Agent(DEFAULT_HTTP_AGENT_CONFIG);
}
return Sender.DEFAULT_HTTP_AGENT;
}
/**
* @ignore
* @return {https.Agent} Returns the default https agent.
*/
getDefaultHttpsAgent() {
if (!Sender.DEFAULT_HTTPS_AGENT) {
Sender.DEFAULT_HTTPS_AGENT = new https.Agent(DEFAULT_HTTP_AGENT_CONFIG);
}
return Sender.DEFAULT_HTTPS_AGENT;
}
/**
* Closes the TCP connection to the database. <br>
* Data sitting in the Sender's buffer will be lost unless flush() is called before close().
*/
async close() {
if (this.socket) {
const address = this.socket.remoteAddress;
const port = this.socket.remotePort;
this.socket.destroy();
this.socket = null;
this.log('info', `Connection to ${address}:${port} is closed`);
}
}
/**
* Sends the buffer's content to the database and compacts the buffer.
* If the last row is not finished it stays in the sender's buffer.
*
* @return {Promise<boolean>} Resolves to true when there was data in the buffer to send.
*/
async flush() {
const data = this.toBuffer(this.endOfLastRow);
if (!data) {
return false;
}
if (this.http) {
const request = this.secure ? https.request : http.request;
const options = createRequestOptions(this, data);
return sendHttp(this, request, options, data, this.retryTimeout);
} else {
if (!this.socket) {
throw new Error('Sender is not connected');
}
return sendTcp(this, data);
}
}
/**
* @ignore
* @return {Buffer} Returns a cropped buffer ready to send to the server or null if there is nothing to send.
* The returned buffer is backed by the sender's buffer.
*/
toBufferView(pos = this.position) {
return pos > 0
? this.buffer.subarray(0, pos)
: null;
}
/**
* @ignore
* @return {Buffer} Returns a cropped buffer ready to send to the server or null if there is nothing to send.
* The returned buffer is a copy of the sender's buffer.
*/
toBufferNew(pos = this.position) {
if (pos > 0) {
const data = Buffer.allocUnsafe(pos);
this.buffer.copy(data, 0, 0, pos);
compact(this);
return data;
}
return null;
}
/**
* Write the table name into the buffer of the sender.
*
* @param {string} table - Table name.
* @return {Sender} Returns with a reference to this sender.
*/
table(table) {
if (typeof table !== 'string') {
throw new Error(`Table name must be a string, received ${typeof table}`);
}
if (this.hasTable) {
throw new Error('Table name has already been set');
}
validateTableName(table, this.maxNameLength);
checkCapacity(this, [table]);
writeEscaped(this, table);
this.hasTable = true;
return this;
}
/**
* Write a symbol name and value into the buffer of the sender.
*
* @param {string} name - Symbol name.
* @param {any} value - Symbol value, toString() will be called to extract the actual symbol value from the parameter.
* @return {Sender} Returns with a reference to this sender.
*/
symbol(name, value) {
if (typeof name !== 'string') {
throw new Error(`Symbol name must be a string, received ${typeof name}`);
}
if (!this.hasTable || this.hasColumns) {
throw new Error('Symbol can be added only after table name is set and before any column added');
}
const valueStr = value.toString();
checkCapacity(this, [name, valueStr], 2 + name.length + valueStr.length);
write(this, ',');
validateColumnName(name, this.maxNameLength);
writeEscaped(this, name);
write(this, '=');
writeEscaped(this, valueStr);
this.hasSymbols = true;
return this;
}
/**
* Write a string column with its value into the buffer of the sender.
*
* @param {string} name - Column name.
* @param {string} value - Column value, accepts only string values.
* @return {Sender} Returns with a reference to this sender.
*/
stringColumn(name, value) {
writeColumn(this, name, value, () => {
checkCapacity(this, [value], 2 + value.length);
write(this, '"');
writeEscaped(this, value, true);
write(this, '"');
}, 'string');
return this;
}
/**
* Write a boolean column with its value into the buffer of the sender.
*
* @param {string} name - Column name.
* @param {boolean} value - Column value, accepts only boolean values.
* @return {Sender} Returns with a reference to this sender.
*/
booleanColumn(name, value) {
writeColumn(this, name, value, () => {
checkCapacity(this, [], 1);
write(this, value ? 't' : 'f');
}, 'boolean');
return this;
}
/**
* Write a float column with its value into the buffer of the sender.
*
* @param {string} name - Column name.
* @param {number} value - Column value, accepts only number values.
* @return {Sender} Returns with a reference to this sender.
*/
floatColumn(name, value) {
writeColumn(this, name, value, () => {
const valueStr = value.toString();
checkCapacity(this, [valueStr], valueStr.length);
write(this, valueStr);
}, 'number');
return this;
}
/**
* Write an integer column with its value into the buffer of the sender.
*
* @param {string} name - Column name.
* @param {number} value - Column value, accepts only number values.
* @return {Sender} Returns with a reference to this sender.
*/
intColumn(name, value) {
if (!Number.isInteger(value)) {
throw new Error(`Value must be an integer, received ${value}`);
}
writeColumn(this, name, value, () => {
const valueStr = value.toString();
checkCapacity(this, [valueStr], 1 + valueStr.length);
write(this, valueStr);
write(this, 'i');
});
return this;
}
/**
* Write a timestamp column with its value into the buffer of the sender.
*
* @param {string} name - Column name.
* @param {number | bigint} value - Epoch timestamp, accepts numbers or BigInts.
* @param {string} [unit=us] - Timestamp unit. Supported values: 'ns' - nanoseconds, 'us' - microseconds, 'ms' - milliseconds. Defaults to 'us'.
* @return {Sender} Returns with a reference to this sender.
*/
timestampColumn(name, value, unit = 'us') {
if (typeof value !== 'bigint' && !Number.isInteger(value)) {
throw new Error(`Value must be an integer or BigInt, received ${value}`);
}
writeColumn(this, name, value, () => {
const valueMicros = timestampToMicros(BigInt(value), unit);
const valueStr = valueMicros.toString();
checkCapacity(this, [valueStr], 1 + valueStr.length);
write(this, valueStr);
write(this, 't');
});
return this;
}
/**
* Closing the row after writing the designated timestamp into the buffer of the sender.
*
* @param {number | bigint} timestamp - Designated epoch timestamp, accepts numbers or BigInts.
* @param {string} [unit=us] - Timestamp unit. Supported values: 'ns' - nanoseconds, 'us' - microseconds, 'ms' - milliseconds. Defaults to 'us'.
*/
async at(timestamp, unit = 'us') {
if (!this.hasSymbols && !this.hasColumns) {
throw new Error('The row must have a symbol or column set before it is closed');
}
if (typeof timestamp !== 'bigint' && !Number.isInteger(timestamp)) {
throw new Error(`Designated timestamp must be an integer or BigInt, received ${timestamp}`);
}
const timestampNanos = timestampToNanos(BigInt(timestamp), unit);
const timestampStr = timestampNanos.toString();
checkCapacity(this, [], 2 + timestampStr.length);
write(this, ' ');
write(this, timestampStr);
write(this, '\n');
this.pendingRowCount++;
startNewRow(this);
await autoFlush(this);
}
/**
* Closing the row without writing designated timestamp into the buffer of the sender. <br>
* Designated timestamp will be populated by the server on this record.
*/
async atNow() {
if (!this.hasSymbols && !this.hasColumns) {
throw new Error('The row must have a symbol or column set before it is closed');
}
checkCapacity(this, [], 1);
write(this, '\n');
this.pendingRowCount++;
startNewRow(this);
await autoFlush(this);
}
}
function isBoolean(value) {
return typeof value === 'boolean';
}
function isInteger(value, lowerBound) {
return typeof value === 'number' && Number.isInteger(value) && value >= lowerBound;
}
async function authenticate(sender, challenge) {
// Check for trailing \n which ends the challenge
if (challenge.slice(-1).readInt8() === 10) {
const keyObject = await crypto.createPrivateKey(
{'key': sender.jwk, 'format': 'jwk'}
);
const signature = await crypto.sign(
'RSA-SHA256',
challenge.slice(0, challenge.length - 1),
keyObject
);
return new Promise((resolve, reject) => {
sender.socket.write(`${Buffer.from(signature).toString('base64')}\n`, err => {
err ? reject(err) : resolve(true);
});
});
}
return false;
}
function startNewRow(sender) {
sender.endOfLastRow = sender.position;
sender.hasTable = false;
sender.hasSymbols = false;
sender.hasColumns = false;
}
function createRequestOptions(sender, data) {
const timeoutMillis = (data.length / sender.requestMinThroughput) * 1000 + sender.requestTimeout;
const options = {
hostname: sender.host,
port: sender.port,
agent: sender.agent,
path: '/write?precision=n',
method: 'POST',
timeout: timeoutMillis
};
if (sender.secure) {
options.rejectUnauthorized = sender.tlsVerify;
options.ca = sender.tlsCA;
}
return options;
}
function sendHttp(sender, request, options, data, retryTimeout, retryBegin = -1, retryInterval = -1) {
return new Promise((resolve, reject) => {
let statusCode = -1;
const req = request(options, response => {
statusCode = response.statusCode;
const body = [];
response
.on('data', chunk => {
body.push(chunk);
})
.on('error', err => {
sender.log('error', `resp err=${err}`);
});
if (statusCode === HTTP_NO_CONTENT) {
response.on('end', () => {
if (body.length > 0) {
sender.log('warn', `Unexpected message from server: ${Buffer.concat(body)}`);
}
sender.doResolve(resolve);
});
} else {
req.destroy(new Error(`HTTP request failed, statusCode=${statusCode}, error=${Buffer.concat(body)}`));
}
});
if (sender.token) {
req.setHeader('Authorization', 'Bearer ' + sender.token);
} else if (sender.username && sender.password) {
req.setHeader('Authorization', 'Basic ' + Buffer.from(sender.username + ':' + sender.password).toString('base64'));
}
req.on('timeout', () => {
// set a retryable error code
statusCode = 524;
req.destroy(new Error('HTTP request timeout, no response from server in time'));
});
req.on('error', err => {
// if the error is thrown while the request is sent, statusCode is -1 => no retry
// request timeout comes through with statusCode 524 => retry
// if the error is thrown while the response is processed, the statusCode is taken from the response => retry depends on statusCode
if (isRetryable(statusCode) && retryTimeout > 0) {
if (retryBegin < 0) {
retryBegin = Date.now();
retryInterval = 10;
} else {
const elapsed = Date.now() - retryBegin;
if (elapsed > retryTimeout) {
reject(err);
return;
}
}
const jitter = Math.floor(Math.random() * 10) - 5;
setTimeout(() => {
retryInterval = Math.min(retryInterval * 2, 1000);
sendHttp(sender, request, options, data, retryTimeout, retryBegin, retryInterval)
.then(() => resolve(true))
.catch(e => reject(e));
}, retryInterval + jitter);
} else {
reject(err);
}
});
req.write(data, err => err ? reject(err) : () => {});
req.end();
});
}
/*
We are retrying on the following response codes (copied from the Rust client):
500: Internal Server Error
503: Service Unavailable
504: Gateway Timeout
// Unofficial extensions
507: Insufficient Storage
509: Bandwidth Limit Exceeded
523: Origin is Unreachable
524: A Timeout Occurred
529: Site is overloaded
599: Network Connect Timeout Error
*/
function isRetryable(statusCode) {
return [500, 503, 504, 507, 509, 523, 524, 529, 599].includes(statusCode);
}
async function autoFlush(sender) {
if (sender.autoFlush && sender.pendingRowCount > 0 && (
(sender.autoFlushRows > 0 && sender.pendingRowCount >= sender.autoFlushRows) ||
(sender.autoFlushInterval > 0 && Date.now() - sender.lastFlushTime >= sender.autoFlushInterval)
)) {
await sender.flush();
}
}
function sendTcp(sender, data) {
return new Promise((resolve, reject) => {
sender.socket.write(data, err => {
err ? reject(err) : sender.doResolve(resolve);
});
});
}
function checkCapacity(sender, data, base = 0) {
let length = base;
for (const str of data) {
length += Buffer.byteLength(str, 'utf8');
}
if (sender.position + length > sender.bufferSize) {
let newSize = sender.bufferSize;
do {
newSize += sender.bufferSize;
} while(sender.position + length > newSize);
sender.resize(newSize);
}
}
function compact(sender) {
if (sender.endOfLastRow > 0) {
sender.buffer.copy(sender.buffer, 0, sender.endOfLastRow, sender.position);
sender.position = sender.position - sender.endOfLastRow;
sender.endOfLastRow = 0;
sender.lastFlushTime = Date.now();
sender.pendingRowCount = 0;
}
}
function writeColumn(sender, name, value, writeValue, valueType) {
if (typeof name !== 'string') {
throw new Error(`Column name must be a string, received ${typeof name}`);
}
if (valueType != null && typeof value !== valueType) {
throw new Error(`Column value must be of type ${valueType}, received ${typeof value}`);
}
if (!sender.hasTable) {
throw new Error('Column can be set only after table name is set');
}
checkCapacity(sender, [name], 2 + name.length);
write(sender, sender.hasColumns ? ',' : ' ');
validateColumnName(name, sender.maxNameLength);
writeEscaped(sender, name);
write(sender, '=');
writeValue();
sender.hasColumns = true;
}
function write(sender, data) {
sender.position += sender.buffer.write(data, sender.position);
if (sender.position > sender.bufferSize) {
throw new Error(`Buffer overflow [position=${sender.position}, bufferSize=${sender.bufferSize}]`);
}
}
function writeEscaped(sender, data, quoted = false) {
for (const ch of data) {
if (ch > '\\') {
write(sender, ch);
continue;
}
switch (ch) {
case ' ':
case ',':
case '=':
if (!quoted) {
write(sender, '\\');
}
write(sender, ch);
break;
case '\n':
case '\r':
write(sender, '\\');
write(sender, ch);
break;
case '"':
if (quoted) {
write(sender, '\\');
}
write(sender, ch);
break;
case '\\':
write(sender, '\\\\');
break;
default:
write(sender, ch);
break;
}
}
}
function timestampToMicros(timestamp, unit) {
switch (unit) {
case 'ns':
return timestamp / 1000n;
case 'us':
return timestamp;
case 'ms':
return timestamp * 1000n;
default:
throw new Error('Unknown timestamp unit: ' + unit);
}
}
function timestampToNanos(timestamp, unit) {
switch (unit) {
case 'ns':
return timestamp;
case 'us':
return timestamp * 1000n;
case 'ms':
return timestamp * 1000_000n;
default:
throw new Error('Unknown timestamp unit: ' + unit);
}
}
function replaceDeprecatedOptions(options) {
// deal with deprecated options
if (options.copyBuffer) {
options.copy_buffer = options.copyBuffer;
options.copyBuffer = undefined;
}
if (options.bufferSize) {
options.init_buf_size = options.bufferSize;
options.bufferSize = undefined;
}
}
function constructAuth(options) {
if (!options.username && !options.token && !options.password) {
// no intention to authenticate
return;
}
if (!options.username || !options.token) {
throw new Error('TCP transport requires a username and a private key for authentication, ' +
'please, specify the \'username\' and \'token\' config options');
}
options.auth = {
keyId: options.username,
token: options.token
};
}
function constructJwk(options) {
if (options.auth) {
if (!options.auth.keyId) {
throw new Error('Missing username, please, specify the \'keyId\' property of the \'auth\' config option. ' +
'For example: new Sender({protocol: \'tcp\', host: \'host\', auth: {keyId: \'username\', token: \'private key\'}})');
}
if (typeof options.auth.keyId !== 'string') {
throw new Error('Please, specify the \'keyId\' property of the \'auth\' config option as a string. ' +
'For example: new Sender({protocol: \'tcp\', host: \'host\', auth: {keyId: \'username\', token: \'private key\'}})');
}
if (!options.auth.token) {
throw new Error('Missing private key, please, specify the \'token\' property of the \'auth\' config option. ' +
'For example: new Sender({protocol: \'tcp\', host: \'host\', auth: {keyId: \'username\', token: \'private key\'}})');
}
if (typeof options.auth.token !== 'string') {
throw new Error('Please, specify the \'token\' property of the \'auth\' config option as a string. ' +
'For example: new Sender({protocol: \'tcp\', host: \'host\', auth: {keyId: \'username\', token: \'private key\'}})');
}
return {
kid: options.auth.keyId,
d: options.auth.token,
...PUBLIC_KEY,
kty: 'EC',
crv: 'P-256'
};
} else {
return options.jwk;
}
}
exports.Sender = Sender;
exports.DEFAULT_BUFFER_SIZE = DEFAULT_BUFFER_SIZE;
exports.DEFAULT_MAX_BUFFER_SIZE = DEFAULT_MAX_BUFFER_SIZE;