var node_fs = require('node:fs');
var node_buffer = require('node:buffer');
var net = require('node:net');
var tls = require('node:tls');
var crypto = require('node:crypto');
var undici = require('undici');
function _interopDefault (e) { return e && e.__esModule ? e : { default: e }; }
var net__default = /*#__PURE__*/_interopDefault(net);
var tls__default = /*#__PURE__*/_interopDefault(tls);
var crypto__default = /*#__PURE__*/_interopDefault(crypto);
const LOG_LEVELS = {
error: {
log: console.error,
criticality: 3
},
warn: {
log: console.warn,
criticality: 2
},
info: {
log: console.info,
criticality: 1
},
debug: {
log: console.debug,
criticality: 0
}
};
const DEFAULT_CRITICALITY = LOG_LEVELS.info.criticality;
/**
* Simple logger to write log messages to the console. <br>
* Supported logging levels are `error`, `warn`, `info` and `debug`. <br>
* Throws an error if logging level is invalid.
*
* @param {'error'|'warn'|'info'|'debug'} level - The log level of the message.
* @param {string} message - The log message.
*/ function log(level, message) {
const logLevel = LOG_LEVELS[level];
if (!logLevel) {
throw new Error(`Invalid log level: '${level}'`);
}
if (logLevel.criticality >= DEFAULT_CRITICALITY) {
logLevel.log(message);
}
}
/**
* Validates a table name. <br>
* Throws an error if table name is invalid.
*
* @param {string} name - The table name to validate.
* @param {number} maxNameLength - The maximum length of table names.
*/ function validateTableName(name, maxNameLength) {
const len = name.length;
if (len > maxNameLength) {
throw new Error(`Table name is too long, max length is ${maxNameLength}`);
}
if (len === 0) {
throw new Error("Empty string is not allowed as table name");
}
for(let i = 0; i < len; i++){
const ch = name[i];
switch(ch){
case ".":
if (i === 0 || i === len - 1 || name[i - 1] === ".") // single dot is allowed in the middle only
// starting with a dot hides directory in Linux
// ending with a dot can be trimmed by some Windows versions / file systems
// double or triple dot looks suspicious
// single dot allowed as compatibility,
// when someone uploads 'file_name.csv' the file name used as the table name
throw new Error("Table name cannot start or end with a dot, and only a single dot allowed");
break;
case "?":
case ",":
case "'":
case '"':
case "\\":
case "/":
case ":":
case ")":
case "(":
case "+":
case "*":
case "%":
case "~":
case "\u0000":
case "\u0001":
case "\u0002":
case "\u0003":
case "\u0004":
case "\u0005":
case "\u0006":
case "\u0007":
case "\u0008":
case "\u0009":
case "\u000B":
case "\u000c":
case "\r":
case "\n":
case "\u000e":
case "\u000f":
case "\u007f":
case "\ufeff":
throw new Error(`Invalid character in table name: ${ch}`);
}
}
}
/**
* Validates a column name. <br>
* Throws an error if column name is invalid.
*
* @param {string} name - The column name to validate.
* @param {number} maxNameLength - The maximum length of column names.
*/ function validateColumnName(name, maxNameLength) {
const len = name.length;
if (len > maxNameLength) {
throw new Error(`Column name is too long, max length is ${maxNameLength}`);
}
if (len === 0) {
throw new Error("Empty string is not allowed as column name");
}
for (const ch of name){
switch(ch){
case "?":
case ".":
case ",":
case "'":
case '"':
case "\\":
case "/":
case ":":
case ")":
case "(":
case "+":
case "-":
case "*":
case "%":
case "~":
case "\u0000":
case "\u0001":
case "\u0002":
case "\u0003":
case "\u0004":
case "\u0005":
case "\u0006":
case "\u0007":
case "\u0008":
case "\u0009":
case "\u000B":
case "\u000c":
case "\r":
case "\n":
case "\u000e":
case "\u000f":
case "\u007f":
case "\ufeff":
throw new Error(`Invalid character in column name: ${ch}`);
}
}
}
const HTTP_PORT = 9000;
const TCP_PORT = 9009;
const HTTP = "http";
const HTTPS = "https";
const TCP = "tcp";
const TCPS = "tcps";
const ON = "on";
const OFF = "off";
const UNSAFE_OFF = "unsafe_off";
/** @classdesc
* <a href="Sender.html">Sender</a> configuration options. <br>
* <br>
* Properties of the object are initialized through a configuration string. <br>
* The configuration string has the following format: <i><protocol>::<key>=<value><key>=<value>...;</i> <br>
* The keys are case-sensitive, the trailing semicolon is optional. <br>
* The values are validated, and an error is thrown if the format is invalid. <br>
* <br>
* Connection and protocol options
* <ul>
* <li> <b>protocol</b>: <i>enum, accepted values: http, https, tcp, tcps</i> - The protocol used to communicate with the server. <br>
* When <i>https</i> or <i>tcps</i> used, the connection is secured with TLS encryption.
* </li>
* <li> addr: <i>string</i> - Hostname and port, separated by colon. This key is mandatory, but the port part is optional. <br>
* If no port is specified, a default will be used. <br>
* When the protocol is HTTP/HTTPS, the port defaults to 9000. When the protocol is TCP/TCPS, the port defaults to 9009. <br>
* <br>
* Examples: <i>http::addr=localhost:9000</i>, <i>https::addr=localhost:9000</i>, <i>http::addr=localhost</i>, <i>tcp::addr=localhost:9009</i>
* </li>
* </ul>
* <br>
* Authentication options
* <ul>
* <li> username: <i>string</i> - Used for authentication. <br>
* For HTTP, Basic Authentication requires the <i>password</i> option. <br>
* For TCP with JWK token authentication, <i>token</i> option is required.
* </li>
* <li> password: <i>string</i> - Password for HTTP Basic authentication, should be accompanied by the <i>username</i> option.
* </li>
* <li> token: <i>string</i> - For HTTP with Bearer authentication, this is the bearer token. <br>
* For TCP with JWK token authentication, this is the private key part of the JWK token,
* and must be accompanied by the <i>username</i> option.
* </li>
* </ul>
* <br>
* TLS options
* <ul>
* <li> tls_verify: <i>enum, accepted values: on, unsafe_off</i> - When the HTTPS or TCPS protocols are selected, TLS encryption is used. <br>
* By default, the Sender will verify the server's certificate, but this check can be disabled by setting this option to <i>off</i>. This is useful
* non-production environments where self-signed certificates might be used, but should be avoided in production if possible.
* </li>
* <li> tls_ca: <i>string</i> - Path to a file containing the root CA's certificate in PEM format. <br>
* Can be useful when self-signed certificates are used, otherwise should not be set.
* </li>
* </ul>
* <br>
* Auto flush options
* <ul>
* <li> auto_flush: <i>enum, accepted values: on, off</i> - The Sender automatically flushes the buffer by default. This can be switched off
* by setting this option to <i>off</i>. <br>
* When disabled, the flush() method of the Sender has to be called explicitly to make sure data is sent to the server. <br>
* Manual buffer flushing can be useful, especially when we want to use transactions. When the HTTP protocol is used, each flush results in a single HTTP
* request, which becomes a single transaction on the server side. The transaction either succeeds, and all rows sent in the request are
* inserted; or it fails, and none of the rows make it into the database.
* </li>
* <li> auto_flush_rows: <i>integer</i> - The number of rows that will trigger a flush. When set to 0, row-based flushing is disabled. <br>
* The Sender will default this parameter to 75000 rows when HTTP protocol is used, and to 600 in case of TCP protocol.
* </li>
* <li> auto_flush_interval: <i>integer</i> - The number of milliseconds that will trigger a flush, default value is 1000.
* When set to 0, interval-based flushing is disabled. <br>
* Note that the setting is checked only when a new row is added to the buffer. There is no timer registered to flush the buffer automatically.
* </li>
* </ul>
* <br>
* Buffer sizing options
* <ul>
* <li> init_buf_size: <i>integer</i> - Initial buffer size, defaults to 64 KiB in the Sender.
* </li>
* <li> max_buf_size: <i>integer</i> - Maximum buffer size, defaults to 100 MiB in the Sender. <br>
* If the buffer would need to be extended beyond the maximum size, an error is thrown.
* </li>
* </ul>
* <br>
* HTTP request specific options
* <ul>
* <li> request_timeout: <i>integer</i> - The time in milliseconds to wait for a response from the server, set to 10 seconds by default. <br>
* This is in addition to the calculation derived from the <i>request_min_throughput</i> parameter.
* </li>
* <li> request_min_throughput: <i>integer</i> - Minimum expected throughput in bytes per second for HTTP requests, set to 100 KiB/s seconds by default. <br>
* If the throughput is lower than this value, the connection will time out. This is used to calculate an additional
* timeout on top of <i>request_timeout</i>. This is useful for large requests. You can set this value to 0 to disable this logic.
* </li>
* <li> retry_timeout: <i>integer</i> - The time in milliseconds to continue retrying after a failed HTTP request, set to 10 seconds by default. <br>
* The interval between retries is an exponential backoff starting at 10ms and doubling after each failed attempt up to a maximum of 1 second.
* </li>
* </ul>
* <br>
* Other options
* <ul>
* <li> max_name_len: <i>integer</i> - The maximum length of a table or column name, the Sender defaults this parameter to 127. <br>
* Recommended to use the same setting as the server, which also uses 127 by default.
* </li>
* <li> copy_buffer: <i>enum, accepted values: on, off</i> - By default, the Sender creates a new buffer for every flush() call,
* and the data to be sent to the server is copied into this new buffer.
* Setting the flag to <i>off</i> results in reusing the same buffer instance for each flush() call. <br>
* Use this flag only if calls to the client are serialised.
* </li>
* </ul>
*/ class SenderOptions {
/**
* Creates a Sender options object by parsing the provided configuration string.
*
* @param {string} configurationString - Configuration string. <br>
* @param {object} extraOptions - Optional extra configuration. <br>
* - 'log' is a logging function used by the <a href="Sender.html">Sender</a>. <br>
* Prototype: <i>(level: 'error'|'warn'|'info'|'debug', message: string) => void</i>. <br>
* - 'agent' is a custom http/https agent used by the <a href="Sender.html">Sender</a> when http/https transport is used. <br>
* A <i>http.Agent</i> or <i>https.Agent</i> object is expected.
*/ constructor(configurationString, extraOptions = undefined){
parseConfigurationString(this, configurationString);
if (extraOptions) {
if (extraOptions.log && typeof extraOptions.log !== "function") {
throw new Error("Invalid logging function");
}
this.log = extraOptions.log;
if (extraOptions.agent && !(extraOptions.agent instanceof undici.Agent)) {
throw new Error("Invalid http/https agent");
}
this.agent = extraOptions.agent;
}
}
/**
* Creates a Sender options object by parsing the provided configuration string.
*
* @param {string} configurationString - Configuration string. <br>
* @param {object} extraOptions - Optional extra configuration. <br>
* - 'log' is a logging function used by the <a href="Sender.html">Sender</a>. <br>
* Prototype: <i>(level: 'error'|'warn'|'info'|'debug', message: string) => void</i>. <br>
* - 'agent' is a custom http/https agent used by the <a href="Sender.html">Sender</a> when http/https transport is used. <br>
* A <i>http.Agent</i> or <i>https.Agent</i> object is expected.
*
* @return {SenderOptions} A Sender configuration object initialized from the provided configuration string.
*/ static fromConfig(configurationString, extraOptions = undefined) {
return new SenderOptions(configurationString, extraOptions);
}
/**
* Creates a Sender options object by parsing the configuration string set in the <b>QDB_CLIENT_CONF</b> environment variable.
*
* @param {object} extraOptions - Optional extra configuration. <br>
* - 'log' is a logging function used by the <a href="Sender.html">Sender</a>. <br>
}in /**br>
* - 'agent' is a custom http/https agent used by the <a href="Sender.html">Sender</a> when http/https transport is used. <br>
* A <i>http.Agent</i> or <i>https.Agent</i> object is expected.
*
* @return {SenderOptions} A Sender configuration object initialized from the <b>QDB_CLIENT_CONF</b> environment variable.
*/ static fromEnv(extraOptions = undefined) {
return SenderOptions.fromConfig(process.env.QDB_CLIENT_CONF, extraOptions);
}
}
function parseConfigurationString(options, configString) {
if (!configString) {
throw new Error("Configuration string is missing or empty");
}
const position = parseProtocol(options, configString);
parseSettings(options, configString, position);
parseAddress(options);
parseBufferSizes(options);
parseAutoFlushOptions(options);
parseTlsOptions(options);
parseRequestTimeoutOptions(options);
parseMaxNameLength(options);
parseCopyBuffer(options);
}
function parseSettings(options, configString, position) {
let index = configString.indexOf(";", position);
while(index > -1){
if (index + 1 < configString.length && configString.charAt(index + 1) === ";") {
index = configString.indexOf(";", index + 2);
continue;
}
parseSetting(options, configString, position, index);
position = index + 1;
index = configString.indexOf(";", position);
}
if (position < configString.length) {
parseSetting(options, configString, position, configString.length);
}
}
function parseSetting(options, configString, position, index) {
const setting = configString.slice(position, index).replaceAll(";;", ";");
const equalsIndex = setting.indexOf("=");
if (equalsIndex < 0) {
throw new Error(`Missing '=' sign in '${setting}'`);
}
const key = setting.slice(0, equalsIndex);
const value = setting.slice(equalsIndex + 1);
validateConfigKey(key);
validateConfigValue(key, value);
options[key] = value;
}
const ValidConfigKeys = [
"addr",
"username",
"password",
"token",
"token_x",
"token_y",
"auto_flush",
"auto_flush_rows",
"auto_flush_interval",
"copy_buffer",
"request_min_throughput",
"request_timeout",
"retry_timeout",
"init_buf_size",
"max_buf_size",
"max_name_len",
"tls_verify",
"tls_ca",
"tls_roots",
"tls_roots_password"
];
function validateConfigKey(key) {
if (!ValidConfigKeys.includes(key)) {
throw new Error(`Unknown configuration key: '${key}'`);
}
}
function validateConfigValue(key, value) {
if (!value) {
throw new Error(`Invalid configuration, value is not set for '${key}'`);
}
for(let i = 0; i < value.length; i++){
const unicode = value.codePointAt(i);
if (unicode < 0x20 || unicode > 0x7e && unicode < 0xa0) {
throw new Error(`Invalid configuration, control characters are not allowed: '${value}'`);
}
}
}
function parseProtocol(options, configString) {
const index = configString.indexOf("::");
if (index < 0) {
throw new Error("Missing protocol, configuration string format: 'protocol::key1=value1;key2=value2;key3=value3;'");
}
options.protocol = configString.slice(0, index);
switch(options.protocol){
case HTTP:
case HTTPS:
case TCP:
case TCPS:
break;
default:
throw new Error(`Invalid protocol: '${options.protocol}', accepted protocols: 'http', 'https', 'tcp', 'tcps'`);
}
return index + 2;
}
function parseAddress(options) {
if (!options.addr) {
throw new Error("Invalid configuration, 'addr' is required");
}
const index = options.addr.indexOf(":");
if (index < 0) {
options.host = options.addr;
switch(options.protocol){
case HTTP:
case HTTPS:
options.port = HTTP_PORT;
return;
case TCP:
case TCPS:
options.port = TCP_PORT;
return;
default:
throw new Error(`Invalid protocol: '${options.protocol}', accepted protocols: 'http', 'https', 'tcp', 'tcps'`);
}
}
options.host = options.addr.slice(0, index);
if (!options.host) {
throw new Error(`Host name is required`);
}
const portStr = options.addr.slice(index + 1);
if (!portStr) {
throw new Error(`Port is required`);
}
options.port = Number(portStr);
if (isNaN(options.port)) {
throw new Error(`Invalid port: '${portStr}'`);
}
if (!Number.isInteger(options.port) || options.port < 1) {
throw new Error(`Invalid port: ${options.port}`);
}
}
function parseBufferSizes(options) {
parseInteger(options, "init_buf_size", "initial buffer size", 1);
parseInteger(options, "max_buf_size", "max buffer size", 1);
}
function parseAutoFlushOptions(options) {
parseBoolean(options, "auto_flush", "auto flush");
parseInteger(options, "auto_flush_rows", "auto flush rows", 0);
parseInteger(options, "auto_flush_interval", "auto flush interval", 0);
}
function parseTlsOptions(options) {
parseBoolean(options, "tls_verify", "TLS verify", UNSAFE_OFF);
if (options.tls_roots || options.tls_roots_password) {
throw new Error("'tls_roots' and 'tls_roots_password' options are not supported, please, " + "use the 'tls_ca' option or the NODE_EXTRA_CA_CERTS environment variable instead");
}
}
function parseRequestTimeoutOptions(options) {
parseInteger(options, "request_min_throughput", "request min throughput", 1);
parseInteger(options, "request_timeout", "request timeout", 1);
parseInteger(options, "retry_timeout", "retry timeout", 0);
}
function parseMaxNameLength(options) {
parseInteger(options, "max_name_len", "max name length", 1);
}
function parseCopyBuffer(options) {
parseBoolean(options, "copy_buffer", "copy buffer");
}
function parseBoolean(options, property, description, offValue = OFF) {
if (options[property]) {
const property_str = options[property];
switch(property_str){
case ON:
options[property] = true;
break;
case offValue:
options[property] = false;
break;
default:
throw new Error(`Invalid ${description} option: '${property_str}'`);
}
}
}
function parseInteger(options, property, description, lowerBound) {
if (options[property]) {
const property_str = options[property];
options[property] = Number(property_str);
if (isNaN(options[property])) {
throw new Error(`Invalid ${description} option, not a number: '${property_str}'`);
}
if (!Number.isInteger(options[property]) || options[property] < lowerBound) {
throw new Error(`Invalid ${description} option: ${options[property]}`);
}
}
}
// @ts-check
const HTTP_NO_CONTENT = 204; // success
const DEFAULT_HTTP_AUTO_FLUSH_ROWS = 75000;
const DEFAULT_TCP_AUTO_FLUSH_ROWS = 600;
const DEFAULT_AUTO_FLUSH_INTERVAL = 1000; // 1 sec
const DEFAULT_MAX_NAME_LENGTH = 127;
const DEFAULT_REQUEST_MIN_THROUGHPUT = 102400; // 100 KB/sec
const DEFAULT_REQUEST_TIMEOUT = 10000; // 10 sec
const DEFAULT_RETRY_TIMEOUT = 10000; // 10 sec
const DEFAULT_BUFFER_SIZE = 65536; // 64 KB
const DEFAULT_MAX_BUFFER_SIZE = 104857600; // 100 MB
/** @type {Agent.Options} */ const DEFAULT_HTTP_OPTIONS = {
connect: {
keepAlive: true
},
pipelining: 1,
keepAliveTimeout: 60000
};
// an arbitrary public key, not used in authentication
// only used to construct a valid JWK token which is accepted by the crypto API
const PUBLIC_KEY = {
x: "aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc",
y: "__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg"
};
/*
We are retrying on the following response codes (copied from the Rust client):
500: Internal Server Error
503: Service Unavailable
504: Gateway Timeout
// Unofficial extensions
507: Insufficient Storage
509: Bandwidth Limit Exceeded
523: Origin is Unreachable
524: A Timeout Occurred
529: Site is overloaded
599: Network Connect Timeout Error
*/ const RETRIABLE_STATUS_CODES = [
500,
503,
504,
507,
509,
523,
524,
529,
599
];
/** @classdesc
* The QuestDB client's API provides methods to connect to the database, ingest data, and close the connection.
* The supported protocols are HTTP and TCP. HTTP is preferred as it provides feedback in the HTTP response. <br>
* Based on benchmarks HTTP also provides higher throughput, if configured to ingest data in bigger batches.
* <p>
* The client supports authentication. <br>
* Authentication details can be passed to the Sender in its configuration options. <br>
* The client supports Basic username/password and Bearer token authentication methods when used with HTTP protocol,
* and JWK token authentication when ingesting data via TCP. <br>
* Please, note that authentication is enabled by default in QuestDB Enterprise only. <br>
* Details on how to configure authentication in the open source version of
* QuestDB: {@link https://questdb.io/docs/reference/api/ilp/authenticate}
* </p>
* <p>
* The client also supports TLS encryption for both, HTTP and TCP transports to provide a secure connection. <br>
* Please, note that the open source version of QuestDB does not support TLS, and requires an external reverse-proxy,
* such as Nginx to enable encryption.
* </p>
* <p>
* The client uses a buffer to store data. It automatically flushes the buffer by sending its content to the server.
* Auto flushing can be disabled via configuration options to gain control over transactions. Initial and maximum
* buffer sizes can also be set.
* </p>
* <p>
* It is recommended that the Sender is created by using one of the static factory methods,
* <i>Sender.fromConfig(configString, extraOptions)</i> or <i>Sender.fromEnv(extraOptions)</i>).
* If the Sender is created via its constructor, at least the SenderOptions configuration object should be
* initialized from a configuration string to make sure that the parameters are validated. <br>
* Detailed description of the Sender's configuration options can be found in
* the <a href="SenderOptions.html">SenderOptions</a> documentation.
* </p>
* <p>
* Extra options can be provided to the Sender in the <i>extraOptions</i> configuration object. <br>
* A custom logging function and a custom HTTP(S) agent can be passed to the Sender in this object. <br>
* The logger implementation provides the option to direct log messages to the same place where the host application's
* log is saved. The default logger writes to the console. <br>
* The custom HTTP(S) agent option becomes handy if there is a need to modify the default options set for the
* HTTP(S) connections. A popular setting would be disabling persistent connections, in this case an agent can be
* passed to the Sender with <i>keepAlive</i> set to <i>false</i>. <br>
* For example: <i>Sender.fromConfig(`http::addr=host:port`, { agent: new undici.Agent({ connect: { keepAlive: false } })})</i> <br>
* If no custom agent is configured, the Sender will use its own agent which overrides some default values
* of <i>undici.Agent</i>. The Sender's own agent uses persistent connections with 1 minute idle timeout, pipelines requests default to 1.
* </p>
*/ class Sender {
/**
* Creates an instance of Sender.
*
* @param {SenderOptions} options - Sender configuration object. <br>
* See SenderOptions documentation for detailed description of configuration options. <br>
*/ constructor(options){
if (!options || !options.protocol) {
throw new Error("The 'protocol' option is mandatory");
}
replaceDeprecatedOptions(options);
this.log = typeof options.log === "function" ? options.log : log;
switch(options.protocol){
case HTTP:
this.http = true;
this.secure = false;
this.agent = options.agent instanceof undici.Agent ? options.agent : this.getDefaultHttpAgent();
break;
case HTTPS:
this.http = true;
this.secure = true;
this.agent = options.agent instanceof undici.Agent ? options.agent : this.getDefaultHttpAgent();
break;
case TCP:
this.http = false;
this.secure = false;
break;
case TCPS:
this.http = false;
this.secure = true;
break;
default:
throw new Error(`Invalid protocol: '${options.protocol}'`);
}
if (this.http) {
this.username = options.username;
this.password = options.password;
this.token = options.token;
if (!options.port) {
options.port = 9000;
}
} else {
if (!options.auth && !options.jwk) {
constructAuth(options);
}
this.jwk = constructJwk(options);
if (!options.port) {
options.port = 9009;
}
}
this.host = options.host;
this.port = options.port;
this.tlsVerify = isBoolean(options.tls_verify) ? options.tls_verify : true;
this.tlsCA = options.tls_ca ? node_fs.readFileSync(options.tls_ca) : undefined;
this.autoFlush = isBoolean(options.auto_flush) ? options.auto_flush : true;
this.autoFlushRows = isInteger(options.auto_flush_rows, 0) ? options.auto_flush_rows : this.http ? DEFAULT_HTTP_AUTO_FLUSH_ROWS : DEFAULT_TCP_AUTO_FLUSH_ROWS;
this.autoFlushInterval = isInteger(options.auto_flush_interval, 0) ? options.auto_flush_interval : DEFAULT_AUTO_FLUSH_INTERVAL;
this.maxNameLength = isInteger(options.max_name_len, 1) ? options.max_name_len : DEFAULT_MAX_NAME_LENGTH;
this.requestMinThroughput = isInteger(options.request_min_throughput, 0) ? options.request_min_throughput : DEFAULT_REQUEST_MIN_THROUGHPUT;
this.requestTimeout = isInteger(options.request_timeout, 1) ? options.request_timeout : DEFAULT_REQUEST_TIMEOUT;
this.retryTimeout = isInteger(options.retry_timeout, 0) ? options.retry_timeout : DEFAULT_RETRY_TIMEOUT;
const noCopy = isBoolean(options.copy_buffer) && !options.copy_buffer;
this.toBuffer = noCopy ? this.toBufferView : this.toBufferNew;
this.doResolve = noCopy ? (resolve)=>{
compact(this);
resolve(true);
} : (resolve)=>{
resolve(true);
};
this.maxBufferSize = isInteger(options.max_buf_size, 1) ? options.max_buf_size : DEFAULT_MAX_BUFFER_SIZE;
this.resize(isInteger(options.init_buf_size, 1) ? options.init_buf_size : DEFAULT_BUFFER_SIZE);
this.reset();
}
/**
* Creates a Sender options object by parsing the provided configuration string.
*
* @param {string} configurationString - Configuration string. <br>
* @param {object} extraOptions - Optional extra configuration. <br>
* - 'log' is a logging function used by the <a href="Sender.html">Sender</a>. <br>
* Prototype: <i>(level: 'error'|'warn'|'info'|'debug', message: string) => void</i>. <br>
* - 'agent' is a custom Undici agent used by the <a href="Sender.html">Sender</a> when http/https transport is used. <br>
* A <i>undici.Agent</i> object is expected.
*
* @return {Sender} A Sender object initialized from the provided configuration string.
*/ static fromConfig(configurationString, extraOptions = undefined) {
return new Sender(SenderOptions.fromConfig(configurationString, extraOptions));
}
/**
* Creates a Sender options object by parsing the configuration string set in the <b>QDB_CLIENT_CONF</b> environment variable.
*
* @param {object} extraOptions - Optional extra configuration. <br>
* - 'log' is a logging function used by the <a href="Sender.html">Sender</a>. <br>
* Prototype: <i>(level: 'error'|'warn'|'info'|'debug', message: string) => void</i>. <br>
* - 'agent' is a custom Undici agent used by the <a href="Sender.html">Sender</a> when http/https transport is used. <br>
* A <i>undici.Agent</i> object is expected.
*
* @return {Sender} A Sender object initialized from the <b>QDB_CLIENT_CONF</b> environment variable.
*/ static fromEnv(extraOptions = undefined) {
return new Sender(SenderOptions.fromConfig(process.env.QDB_CLIENT_CONF, extraOptions));
}
/**
* Extends the size of the sender's buffer. <br>
* Can be used to increase the size of buffer if overflown.
* The buffer's content is copied into the new buffer.
*
* @param {number} bufferSize - New size of the buffer used by the sender, provided in bytes.
*/ resize(bufferSize) {
if (bufferSize > this.maxBufferSize) {
throw new Error(`Max buffer size is ${this.maxBufferSize} bytes, requested buffer size: ${bufferSize}`);
}
this.bufferSize = bufferSize;
// Allocating an extra byte because Buffer.write() does not fail if the length of the data to be written is
// longer than the size of the buffer. It simply just writes whatever it can, and returns.
// If we can write into the extra byte, that indicates buffer overflow.
// See the check in our write() function.
const newBuffer = node_buffer.Buffer.alloc(this.bufferSize + 1, 0, "utf8");
if (this.buffer) {
this.buffer.copy(newBuffer);
}
this.buffer = newBuffer;
}
/**
* Resets the buffer, data added to the buffer will be lost. <br>
* In other words it clears the buffer and sets the writing position to the beginning of the buffer.
*
* @return {Sender} Returns with a reference to this sender.
*/ reset() {
this.position = 0;
this.lastFlushTime = Date.now();
this.pendingRowCount = 0;
startNewRow(this);
return this;
}
/**
* Creates a TCP connection to the database.
*
* @param {net.NetConnectOpts | tls.ConnectionOptions} connectOptions - Connection options, host and port are required.
*
* @return {Promise<boolean>} Resolves to true if the client is connected.
*/ connect(connectOptions = undefined) {
if (this.http) {
throw new Error("'connect()' should be called only if the sender connects via TCP");
}
if (!connectOptions) {
connectOptions = {
host: this.host,
port: this.port,
ca: this.tlsCA
};
}
if (!connectOptions.host) {
throw new Error("Hostname is not set");
}
if (!connectOptions.port) {
throw new Error("Port is not set");
}
return new Promise((resolve, reject)=>{
if (this.socket) {
throw new Error("Sender connected already");
}
let authenticated = false;
let data;
this.socket = !this.secure ? net__default.default.connect(connectOptions) : tls__default.default.connect(connectOptions, ()=>{
if (authenticated) {
resolve(true);
}
});
this.socket.setKeepAlive(true);
this.socket.on("data", async (raw)=>{
data = !data ? raw : node_buffer.Buffer.concat([
data,
raw
]);
if (!authenticated) {
authenticated = await authenticate(this, data);
if (authenticated) {
resolve(true);
}
} else {
this.log("warn", `Received unexpected data: ${data}`);
}
}).on("ready", async ()=>{
this.log("info", `Successfully connected to ${connectOptions.host}:${connectOptions.port}`);
if (this.jwk) {
this.log("info", `Authenticating with ${connectOptions.host}:${connectOptions.port}`);
await this.socket.write(`${this.jwk.kid}\n`, (err)=>{
if (err) {
reject(err);
}
});
} else {
authenticated = true;
if (!this.secure || !this.tlsVerify) {
resolve(true);
}
}
}).on("error", (err)=>{
this.log("error", err);
if (err.code !== "SELF_SIGNED_CERT_IN_CHAIN" || this.tlsVerify) {
reject(err);
}
});
});
}
/**
* @ignore
* @return {Agent} Returns the default http agent.
*/ getDefaultHttpAgent() {
if (!Sender.DEFAULT_HTTP_AGENT) {
Sender.DEFAULT_HTTP_AGENT = new undici.Agent(DEFAULT_HTTP_OPTIONS);
}
return Sender.DEFAULT_HTTP_AGENT;
}
/**
* Closes the TCP connection to the database. <br>
* Data sitting in the Sender's buffer will be lost unless flush() is called before close().
*/ async close() {
if (this.socket) {
const address = this.socket.remoteAddress;
const port = this.socket.remotePort;
this.socket.destroy();
this.socket = null;
this.log("info", `Connection to ${address}:${port} is closed`);
}
}
/**
* Sends the buffer's content to the database and compacts the buffer.
* If the last row is not finished it stays in the sender's buffer.
*
* @return {Promise<boolean>} Resolves to true when there was data in the buffer to send.
*/ async flush() {
const data = this.toBuffer(this.endOfLastRow);
if (!data) {
return false;
}
if (this.http) {
// const request = this.secure ? https.request : http.request;
const options = createRequestOptions(this, data);
return sendHttp(this, options, data, this.retryTimeout);
} else {
if (!this.socket) {
throw new Error("Sender is not connected");
}
return sendTcp(this, data);
}
}
/**
* @ignore
* @return {Buffer} Returns a cropped buffer ready to send to the server or null if there is nothing to send.
* The returned buffer is backed by the sender's buffer.
*/ toBufferView(pos = this.position) {
return pos > 0 ? this.buffer.subarray(0, pos) : null;
}
/**
* @ignore
* @return {Buffer|null} Returns a cropped buffer ready to send to the server or null if there is nothing to send.
* The returned buffer is a copy of the sender's buffer.
*/ toBufferNew(pos = this.position) {
if (pos > 0) {
const data = node_buffer.Buffer.allocUnsafe(pos);
this.buffer.copy(data, 0, 0, pos);
compact(this);
return data;
}
return null;
}
/**
* Write the table name into the buffer of the sender.
*
* @param {string} table - Table name.
* @return {Sender} Returns with a reference to this sender.
*/ table(table) {
if (typeof table !== "string") {
throw new Error(`Table name must be a string, received ${typeof table}`);
}
if (this.hasTable) {
throw new Error("Table name has already been set");
}
validateTableName(table, this.maxNameLength);
checkCapacity(this, [
table
]);
writeEscaped(this, table);
this.hasTable = true;
return this;
}
/**
* Write a symbol name and value into the buffer of the sender.
*
* @param {string} name - Symbol name.
* @param {any} value - Symbol value, toString() will be called to extract the actual symbol value from the parameter.
* @return {Sender} Returns with a reference to this sender.
*/ symbol(name, value) {
if (typeof name !== "string") {
throw new Error(`Symbol name must be a string, received ${typeof name}`);
}
if (!this.hasTable || this.hasColumns) {
throw new Error("Symbol can be added only after table name is set and before any column added");
}
const valueStr = value.toString();
checkCapacity(this, [
name,
valueStr
], 2 + name.length + valueStr.length);
write(this, ",");
validateColumnName(name, this.maxNameLength);
writeEscaped(this, name);
write(this, "=");
writeEscaped(this, valueStr);
this.hasSymbols = true;
return this;
}
/**
* Write a string column with its value into the buffer of the sender.
*
* @param {string} name - Column name.
* @param {string} value - Column value, accepts only string values.
* @return {Sender} Returns with a reference to this sender.
*/ stringColumn(name, value) {
writeColumn(this, name, value, ()=>{
checkCapacity(this, [
value
], 2 + value.length);
write(this, '"');
writeEscaped(this, value, true);
write(this, '"');
}, "string");
return this;
}
/**
* Write a boolean column with its value into the buffer of the sender.
*
* @param {string} name - Column name.
* @param {boolean} value - Column value, accepts only boolean values.
* @return {Sender} Returns with a reference to this sender.
*/ booleanColumn(name, value) {
writeColumn(this, name, value, ()=>{
checkCapacity(this, [], 1);
write(this, value ? "t" : "f");
}, "boolean");
return this;
}
/**
* Write a float column with its value into the buffer of the sender.
*
* @param {string} name - Column name.
* @param {number} value - Column value, accepts only number values.
* @return {Sender} Returns with a reference to this sender.
*/ floatColumn(name, value) {
writeColumn(this, name, value, ()=>{
const valueStr = value.toString();
checkCapacity(this, [
valueStr
], valueStr.length);
write(this, valueStr);
}, "number");
return this;
}
/**
* Write an integer column with its value into the buffer of the sender.
*
* @param {string} name - Column name.
* @param {number} value - Column value, accepts only number values.
* @return {Sender} Returns with a reference to this sender.
*/ intColumn(name, value) {
if (!Number.isInteger(value)) {
throw new Error(`Value must be an integer, received ${value}`);
}
writeColumn(this, name, value, ()=>{
const valueStr = value.toString();
checkCapacity(this, [
valueStr
], 1 + valueStr.length);
write(this, valueStr);
write(this, "i");
});
return this;
}
/**
* Write a timestamp column with its value into the buffer of the sender.
*
* @param {string} name - Column name.
* @param {number | bigint} value - Epoch timestamp, accepts numbers or BigInts.
* @param {string} [unit=us] - Timestamp unit. Supported values: 'ns' - nanoseconds, 'us' - microseconds, 'ms' - milliseconds. Defaults to 'us'.
* @return {Sender} Returns with a reference to this sender.
*/ timestampColumn(name, value, unit = "us") {
if (typeof value !== "bigint" && !Number.isInteger(value)) {
throw new Error(`Value must be an integer or BigInt, received ${value}`);
}
writeColumn(this, name, value, ()=>{
const valueMicros = timestampToMicros(BigInt(value), unit);
const valueStr = valueMicros.toString();
checkCapacity(this, [
valueStr
], 1 + valueStr.length);
write(this, valueStr);
write(this, "t");
});
return this;
}
/**
* Closing the row after writing the designated timestamp into the buffer of the sender.
*
* @param {number | bigint} timestamp - Designated epoch timestamp, accepts numbers or BigInts.
* @param {string} [unit=us] - Timestamp unit. Supported values: 'ns' - nanoseconds, 'us' - microseconds, 'ms' - milliseconds. Defaults to 'us'.
*/ async at(timestamp, unit = "us") {
if (!this.hasSymbols && !this.hasColumns) {
throw new Error("The row must have a symbol or column set before it is closed");
}
if (typeof timestamp !== "bigint" && !Number.isInteger(timestamp)) {
throw new Error(`Designated timestamp must be an integer or BigInt, received ${timestamp}`);
}
const timestampNanos = timestampToNanos(BigInt(timestamp), unit);
const timestampStr = timestampNanos.toString();
checkCapacity(this, [], 2 + timestampStr.length);
write(this, " ");
write(this, timestampStr);
write(this, "\n");
this.pendingRowCount++;
startNewRow(this);
await autoFlush(this);
}
/**
* Closing the row without writing designated timestamp into the buffer of the sender. <br>
* Designated timestamp will be populated by the server on this record.
*/ async atNow() {
if (!this.hasSymbols && !this.hasColumns) {
throw new Error("The row must have a symbol or column set before it is closed");
}
checkCapacity(this, [], 1);
write(this, "\n");
this.pendingRowCount++;
startNewRow(this);
await autoFlush(this);
}
}
function isBoolean(value) {
return typeof value === "boolean";
}
function isInteger(value, lowerBound) {
return typeof value === "number" && Number.isInteger(value) && value >= lowerBound;
}
async function authenticate(sender, challenge) {
// Check for trailing \n which ends the challenge
if (challenge.subarray(-1).readInt8() === 10) {
const keyObject = crypto__default.default.createPrivateKey({
key: sender.jwk,
format: "jwk"
});
const signature = crypto__default.default.sign("RSA-SHA256", challenge.subarray(0, challenge.length - 1), keyObject);
return new Promise((resolve, reject)=>{
sender.socket.write(`${node_buffer.Buffer.from(signature).toString("base64")}\n`, (err)=>{
if (err) {
reject(err);
} else {
resolve(true);
}
});
});
}
return false;
}
function startNewRow(sender) {
sender.endOfLastRow = sender.position;
sender.hasTable = false;
sender.hasSymbols = false;
sender.hasColumns = false;
}
function createRequestOptions(sender, data) {
const timeoutMillis = data.length / sender.requestMinThroughput * 1000 + sender.requestTimeout;
const options = {
hostname: sender.host,
port: sender.port,
agent: sender.agent,
protocol: sender.secure ? "https" : "http",
path: "/write?precision=n",
method: "POST",
timeout: timeoutMillis
};
return options;
}
async function sendHttp(sender, options, data, retryTimeout) {
const retryBegin = Date.now();
const headers = {};
if (sender.secure) {
sender.agent = new undici.Agent({
...DEFAULT_HTTP_OPTIONS,
connect: {
...DEFAULT_HTTP_OPTIONS.connect,
requestCert: sender.tlsVerify,
rejectUnauthorized: sender.tlsVerify,
ca: sender.tlsCA
}
});
}
const dispatcher = new undici.RetryAgent(sender.agent, {
maxRetries: Infinity,
minTimeout: 10,
maxTimeout: 1000,
timeoutFactor: 2,
retryAfter: true,
methods: [
"GET",
"POST",
"PUT",
"DELETE",
"PATCH",
"OPTIONS",
"HEAD"
],
statusCodes: RETRIABLE_STATUS_CODES,
errorCodes: [
"ECONNRESET",
"EAI_AGAIN",
"ECONNREFUSED",
"ETIMEDOUT",
"EPIPE",
"UND_ERR_CONNECT_TIMEOUT",
"UND_ERR_HEADERS_TIMEOUT",
"UND_ERR_BODY_TIMEOUT"
],
retry (err, context, callback) {
const elapsed = Date.now() - retryBegin;
if (elapsed > retryTimeout) {
// Stop retrying if the total retry timeout is exceeded
return callback(err);
}
return callback(null);
}
});
if (sender.token) {
headers["Authorization"] = "Bearer " + sender.token;
} else if (sender.username && sender.password) {
headers["Authorization"] = "Basic " + node_buffer.Buffer.from(sender.username + ":" + sender.password).toString("base64");
}
try {
const { statusCode, body } = await dispatcher.request({
origin: `${options.protocol}://${options.hostname}:${options.port}`,
path: options.path,
method: options.method,
headers,
body: data,
headersTimeout: sender.requestTimeout
});
const responseBody = await body.arrayBuffer();
if (statusCode === HTTP_NO_CONTENT) {
if (responseBody.byteLength > 0) {
sender.log("warn", `Unexpected message from server: ${responseBody.toString()}`);
}
return true;
} else {
const error = new Error(`HTTP request failed, statusCode=${statusCode}, error=${responseBody.toString()}`);
throw error;
}
} catch (err) {
if (err.code === "UND_ERR_HEADERS_TIMEOUT") {
sender.log("error", `HTTP request timeout, no response from server in time`);
throw new Error(`HTTP request timeout, no response from server in time`);
}
sender.log("error", `HTTP request failed, statusCode=500, error=`);
throw new Error(`HTTP request failed, statusCode=500, error=${err.message}`);
}
}
async function autoFlush(sender) {
if (sender.autoFlush && sender.pendingRowCount > 0 && (sender.autoFlushRows > 0 && sender.pendingRowCount >= sender.autoFlushRows || sender.autoFlushInterval > 0 && Date.now() - sender.lastFlushTime >= sender.autoFlushInterval)) {
await sender.flush();
}
}
function sendTcp(sender, data) {
return new Promise((resolve, reject)=>{
sender.socket.write(data, (err)=>{
if (err) {
reject(err);
} else {
sender.doResolve(resolve);
}
});
});
}
function checkCapacity(sender, data, base = 0) {
let length = base;
for (const str of data){
length += node_buffer.Buffer.byteLength(str, "utf8");
}
if (sender.position + length > sender.bufferSize) {
let newSize = sender.bufferSize;
do {
newSize += sender.bufferSize;
}while (sender.position + length > newSize)
sender.resize(newSize);
}
}
function compact(sender) {
if (sender.endOfLastRow > 0) {
sender.buffer.copy(sender.buffer, 0, sender.endOfLastRow, sender.position);
sender.position = sender.position - sender.endOfLastRow;
sender.endOfLastRow = 0;
sender.lastFlushTime = Date.now();
sender.pendingRowCount = 0;
}
}
function writeColumn(sender, name, value, writeValue, valueType) {
if (typeof name !== "string") {
throw new Error(`Column name must be a string, received ${typeof name}`);
}
if (valueType != null && typeof value !== valueType) {
throw new Error(`Column value must be of type ${valueType}, received ${typeof value}`);
}
if (!sender.hasTable) {
throw new Error("Column can be set only after table name is set");
}
checkCapacity(sender, [
name
], 2 + name.length);
write(sender, sender.hasColumns ? "," : " ");
validateColumnName(name, sender.maxNameLength);
writeEscaped(sender, name);
write(sender, "=");
writeValue();
sender.hasColumns = true;
}
function write(sender, data) {
sender.position += sender.buffer.write(data, sender.position);
if (sender.position > sender.bufferSize) {
throw new Error(`Buffer overflow [position=${sender.position}, bufferSize=${sender.bufferSize}]`);
}
}
function writeEscaped(sender, data, quoted = false) {
for (const ch of data){
if (ch > "\\") {
write(sender, ch);
continue;
}
switch(ch){
case " ":
case ",":
case "=":
if (!quoted) {
write(sender, "\\");
}
write(sender, ch);
break;
case "\n":
case "\r":
write(sender, "\\");
write(sender, ch);
break;
case '"':
if (quoted) {
write(sender, "\\");
}
write(sender, ch);
break;
case "\\":
write(sender, "\\\\");
break;
default:
write(sender, ch);
break;
}
}
}
function timestampToMicros(timestamp, unit) {
switch(unit){
case "ns":
return timestamp / 1000n;
case "us":
return timestamp;
case "ms":
return timestamp * 1000n;
default:
throw new Error("Unknown timestamp unit: " + unit);
}
}
function timestampToNanos(timestamp, unit) {
switch(unit){
case "ns":
return timestamp;
case "us":
return timestamp * 1000n;
case "ms":
return timestamp * 1000_000n;
default:
throw new Error("Unknown timestamp unit: " + unit);
}
}
function replaceDeprecatedOptions(options) {
// deal with deprecated options
if (options.copyBuffer) {
options.copy_buffer = options.copyBuffer;
options.copyBuffer = undefined;
}
if (options.bufferSize) {
options.init_buf_size = options.bufferSize;
options.bufferSize = undefined;
}
}
function constructAuth(options) {
if (!options.username && !options.token && !options.password) {
// no intention to authenticate
return;
}
if (!options.username || !options.token) {
throw new Error("TCP transport requires a username and a private key for authentication, " + "please, specify the 'username' and 'token' config options");
}
options.auth = {
keyId: options.username,
token: options.token
};
}
function constructJwk(options) {
if (options.auth) {
if (!options.auth.keyId) {
throw new Error("Missing username, please, specify the 'keyId' property of the 'auth' config option. " + "For example: new Sender({protocol: 'tcp', host: 'host', auth: {keyId: 'username', token: 'private key'}})");
}
if (typeof options.auth.keyId !== "string") {
throw new Error("Please, specify the 'keyId' property of the 'auth' config option as a string. " + "For example: new Sender({protocol: 'tcp', host: 'host', auth: {keyId: 'username', token: 'private key'}})");
}
if (!options.auth.token) {
throw new Error("Missing private key, please, specify the 'token' property of the 'auth' config option. " + "For example: new Sender({protocol: 'tcp', host: 'host', auth: {keyId: 'username', token: 'private key'}})");
}
if (typeof options.auth.token !== "string") {
throw new Error("Please, specify the 'token' property of the 'auth' config option as a string. " + "For example: new Sender({protocol: 'tcp', host: 'host', auth: {keyId: 'username', token: 'private key'}})");
}
return {
kid: options.auth.keyId,
d: options.auth.token,
...PUBLIC_KEY,
kty: "EC",
crv: "P-256"
};
} else {
return options.jwk;
}
}
exports.Sender = Sender;