Source: sender.js

'use strict';

/* eslint-disable no-unused-vars */

const { readFileSync } = require("fs");
const { Buffer } = require('buffer');
const { log } = require('./logging');
const { validateTableName, validateColumnName } = require('./validation');
const { SenderOptions, HTTP, HTTPS, TCP, TCPS } = require('./options');
const http = require('http');
const https = require('https');
const net = require('net');
const tls = require('tls');
const crypto = require('crypto');

const HTTP_NO_CONTENT = 204; // success

const DEFAULT_HTTP_AUTO_FLUSH_ROWS = 75000;
const DEFAULT_TCP_AUTO_FLUSH_ROWS = 600;
const DEFAULT_AUTO_FLUSH_INTERVAL = 1000; // 1 sec

const DEFAULT_MAX_NAME_LENGTH = 127;

const DEFAULT_REQUEST_MIN_THROUGHPUT = 102400; // 100 KB/sec
const DEFAULT_REQUEST_TIMEOUT = 10000; // 10 sec
const DEFAULT_RETRY_TIMEOUT = 10000; // 10 sec

const DEFAULT_BUFFER_SIZE = 65536; //  64 KB
const DEFAULT_MAX_BUFFER_SIZE = 104857600; // 100 MB

// default options for HTTP agent
// - persistent connections with 1 minute idle timeout, server side has 5 minutes set by default
// - max open connections is set to 256, same as server side default
const DEFAULT_HTTP_AGENT_CONFIG = {
    maxSockets: 256,
    keepAlive: true,
    timeout: 60000 // 1 min
}

// an arbitrary public key, not used in authentication
// only used to construct a valid JWK token which is accepted by the crypto API
const PUBLIC_KEY = {
    x: 'aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc',
    y: '__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg'
};

/** @classdesc
 * The QuestDB client's API provides methods to connect to the database, ingest data, and close the connection.
 * The supported protocols are HTTP and TCP. HTTP is preferred as it provides feedback in the HTTP response. <br>
 * Based on benchmarks HTTP also provides higher throughput, if configured to ingest data in bigger batches.
 * <p>
 * The client supports authentication. <br>
 * Authentication details can be passed to the Sender in its configuration options. <br>
 * The client supports Basic username/password and Bearer token authentication methods when used with HTTP protocol,
 * and JWK token authentication when ingesting data via TCP. <br>
 * Please, note that authentication is enabled by default in QuestDB Enterprise only. <br>
 * Details on how to configure authentication in the open source version of
 * QuestDB: {@link https://questdb.io/docs/reference/api/ilp/authenticate}
 * </p>
 * <p>
 * The client also supports TLS encryption for both, HTTP and TCP transports to provide a secure connection. <br>
 * Please, note that the open source version of QuestDB does not support TLS, and requires an external reverse-proxy,
 * such as Nginx to enable encryption.
 * </p>
 * <p>
 * The client uses a buffer to store data. It automatically flushes the buffer by sending its content to the server.
 * Auto flushing can be disabled via configuration options to gain control over transactions. Initial and maximum
 * buffer sizes can also be set.
 * </p>
 * <p>
 * It is recommended that the Sender is created by using one of the static factory methods,
 * <i>Sender.fromConfig(configString, extraOptions)</i> or <i>Sender.fromEnv(extraOptions)</i>).
 * If the Sender is created via its constructor, at least the SenderOptions configuration object should be
 * initialized from a configuration string to make sure that the parameters are validated. <br>
 * Detailed description of the Sender's configuration options can be found in
 * the <a href="SenderOptions.html">SenderOptions</a> documentation.
 * </p>
 * <p>
 * Extra options can be provided to the Sender in the <i>extraOptions</i> configuration object. <br>
 * A custom logging function and a custom HTTP(S) agent can be passed to the Sender in this object. <br>
 * The logger implementation provides the option to direct log messages to the same place where the host application's
 * log is saved. The default logger writes to the console. <br>
 * The custom HTTP(S) agent option becomes handy if there is a need to modify the default options set for the
 * HTTP(S) connections. A popular setting would be disabling persistent connections, in this case an agent can be
 * passed to the Sender with <i>keepAlive</i> set to <i>false</i>. <br>
 * For example: <i>Sender.fromConfig(`http::addr=host:port`, { agent: new http.Agent({ keepAlive: false })})</i> <br>
 * If no custom agent is configured, the Sender will use its own agent which overrides some default values
 * of <i>http.Agent</i>/<i>https.Agent</i>. The Sender's own agent uses persistent connections with 1 minute idle
 * timeout, and limits the number of open connections to the server, which is set to 256 for each host.
 * </p>
 */
class Sender {
    /** @private */ static DEFAULT_HTTP_AGENT;
    /** @private */ static DEFAULT_HTTPS_AGENT;

    /** @private */ http;       // true if the protocol is HTTP/HTTPS, false if it is TCP/TCPS
    /** @private */ secure;     // true if the protocol is HTTPS or TCPS, false otherwise
    /** @private */ host;
    /** @private */ port;

    /** @private */ socket;

    /** @private */ username;
    /** @private */ password;
    /** @private */ token;

    /** @private */ tlsVerify;
    /** @private */ tlsCA;

    /** @private */ bufferSize;
    /** @private */ maxBufferSize;
    /** @private */ buffer;
    /** @private */ toBuffer;
    /** @private */ doResolve;
    /** @private */ position;
    /** @private */ endOfLastRow;

    /** @private */ autoFlush;
    /** @private */ autoFlushRows;
    /** @private */ autoFlushInterval;
    /** @private */ lastFlushTime;
    /** @private */ pendingRowCount;

    /** @private */ requestMinThroughput;
    /** @private */ requestTimeout;
    /** @private */ retryTimeout;

    /** @private */ hasTable;
    /** @private */ hasSymbols;
    /** @private */ hasColumns;

    /** @private */ maxNameLength;

    /** @private */ log;
    /** @private */ agent;

    /**
     * Creates an instance of Sender.
     *
     * @param {SenderOptions} options - Sender configuration object. <br>
     * See SenderOptions documentation for detailed description of configuration options. <br>
     */
    constructor(options) {
        if (!options || !options.protocol) {
            throw new Error('The \'protocol\' option is mandatory');
        }
        replaceDeprecatedOptions(options);

        this.log = typeof options.log === 'function' ? options.log : log;

        switch (options.protocol) {
            case HTTP:
                this.http = true;
                this.secure = false;
                this.agent = options.agent instanceof http.Agent ? options.agent : this.getDefaultHttpAgent();
                break;
            case HTTPS:
                this.http = true;
                this.secure = true;
                this.agent = options.agent instanceof https.Agent ? options.agent : this.getDefaultHttpsAgent();
                break;
            case TCP:
                this.http = false;
                this.secure = false;
                break;
            case TCPS:
                this.http = false;
                this.secure = true;
                break;
            default:
                throw new Error(`Invalid protocol: '${options.protocol}'`);
        }

        if (this.http) {
            this.username = options.username;
            this.password = options.password;
            this.token = options.token;
            if (!options.port) {
                options.port = 9000;
            }
        } else {
            if (!options.auth && !options.jwk) {
                constructAuth(options);
            }
            this.jwk = constructJwk(options);
            if (!options.port) {
                options.port = 9009;
            }
        }

        this.host = options.host;
        this.port = options.port;

        this.tlsVerify = isBoolean(options.tls_verify) ? options.tls_verify : true;
        this.tlsCA = options.tls_ca ? readFileSync(options.tls_ca) : undefined;

        this.autoFlush = isBoolean(options.auto_flush) ? options.auto_flush : true;
        this.autoFlushRows = isInteger(options.auto_flush_rows, 0) ? options.auto_flush_rows : (this.http ? DEFAULT_HTTP_AUTO_FLUSH_ROWS : DEFAULT_TCP_AUTO_FLUSH_ROWS);
        this.autoFlushInterval = isInteger(options.auto_flush_interval, 0) ? options.auto_flush_interval : DEFAULT_AUTO_FLUSH_INTERVAL;

        this.maxNameLength = isInteger(options.max_name_len, 1) ? options.max_name_len : DEFAULT_MAX_NAME_LENGTH;

        this.requestMinThroughput = isInteger(options.request_min_throughput, 1) ? options.request_min_throughput : DEFAULT_REQUEST_MIN_THROUGHPUT;
        this.requestTimeout = isInteger(options.request_timeout, 1) ? options.request_timeout : DEFAULT_REQUEST_TIMEOUT;
        this.retryTimeout = isInteger(options.retry_timeout, 0) ? options.retry_timeout : DEFAULT_RETRY_TIMEOUT;

        const noCopy = isBoolean(options.copy_buffer) && !options.copy_buffer;
        this.toBuffer = noCopy ? this.toBufferView : this.toBufferNew;
        this.doResolve = noCopy
            ? (resolve) => {
                compact(this);
                resolve(true);
            }
            : (resolve) => {
                resolve(true);
            }
        this.maxBufferSize = isInteger(options.max_buf_size, 1) ? options.max_buf_size : DEFAULT_MAX_BUFFER_SIZE;
        this.resize(isInteger(options.init_buf_size, 1) ? options.init_buf_size : DEFAULT_BUFFER_SIZE);
        this.reset();
    }

    /**
     * Creates a Sender options object by parsing the provided configuration string.
     *
     * @param {string} configurationString - Configuration string. <br>
     * @param {object} extraOptions - Optional extra configuration. <br>
     * - 'log' is a logging function used by the <a href="Sender.html">Sender</a>. <br>
     * Prototype: <i>(level: 'error'|'warn'|'info'|'debug', message: string) => void</i>. <br>
     * - 'agent' is a custom http/https agent used by the <a href="Sender.html">Sender</a> when http/https transport is used. <br>
     * A <i>http.Agent</i> or <i>https.Agent</i> object is expected.
     *
     * @return {Sender} A Sender object initialized from the provided configuration string.
     */
    static fromConfig(configurationString, extraOptions = undefined) {
        return new Sender(SenderOptions.fromConfig(configurationString, extraOptions));
    }

    /**
     * Creates a Sender options object by parsing the configuration string set in the <b>QDB_CLIENT_CONF</b> environment variable.
     *
     * @param {object} extraOptions - Optional extra configuration. <br>
     * - 'log' is a logging function used by the <a href="Sender.html">Sender</a>. <br>
     * Prototype: <i>(level: 'error'|'warn'|'info'|'debug', message: string) => void</i>. <br>
     * - 'agent' is a custom http/https agent used by the <a href="Sender.html">Sender</a> when http/https transport is used. <br>
     * A <i>http.Agent</i> or <i>https.Agent</i> object is expected.
     *
     * @return {Sender} A Sender object initialized from the <b>QDB_CLIENT_CONF</b> environment variable.
     */
    static fromEnv(extraOptions = undefined) {
        return new Sender(SenderOptions.fromConfig(process.env.QDB_CLIENT_CONF, extraOptions));
    }

    /**
     * Extends the size of the sender's buffer. <br>
     * Can be used to increase the size of buffer if overflown.
     * The buffer's content is copied into the new buffer.
     *
     * @param {number} bufferSize - New size of the buffer used by the sender, provided in bytes.
     */
    resize(bufferSize) {
        if (bufferSize > this.maxBufferSize) {
            throw new Error(`Max buffer size is ${this.maxBufferSize} bytes, requested buffer size: ${bufferSize}`);
        }
        this.bufferSize = bufferSize;
        // Allocating an extra byte because Buffer.write() does not fail if the length of the data to be written is
        // longer than the size of the buffer. It simply just writes whatever it can, and returns.
        // If we can write into the extra byte, that indicates buffer overflow.
        // See the check in our write() function.
        const newBuffer = Buffer.alloc(this.bufferSize + 1, 0, 'utf8');
        if (this.buffer) {
            this.buffer.copy(newBuffer);
        }
        this.buffer = newBuffer;
    }

    /**
     * Resets the buffer, data added to the buffer will be lost. <br>
     * In other words it clears the buffer and sets the writing position to the beginning of the buffer.
     *
     * @return {Sender} Returns with a reference to this sender.
     */
    reset() {
        this.position = 0;
        this.lastFlushTime = Date.now();
        this.pendingRowCount = 0;
        startNewRow(this);
        return this;
    }

    /**
     * Creates a TCP connection to the database.
     *
     * @param {net.NetConnectOpts | tls.ConnectionOptions} connectOptions - Connection options, host and port are required.
     *
     * @return {Promise<boolean>} Resolves to true if the client is connected.
     */
    connect(connectOptions = undefined) {
        if (this.http) {
            throw new Error('\'connect()\' should be called only if the sender connects via TCP');
        }

        if (!connectOptions) {
            connectOptions = {
                host: this.host,
                port: this.port,
                ca: this.tlsCA
            }
        }
        if (!connectOptions.host) {
            throw new Error('Hostname is not set');
        }
        if (!connectOptions.port) {
            throw new Error('Port is not set');
        }

        let self = this;
        return new Promise((resolve, reject) => {
            if (this.socket) {
                throw new Error('Sender connected already');
            }

            let authenticated = false;
            let data;

            this.socket = !this.secure
                ? net.connect(connectOptions)
                : tls.connect(connectOptions, () => {
                    if (authenticated) {
                        resolve(true);
                    }
                });
            this.socket.setKeepAlive(true);

            this.socket.on('data', async raw => {
                data = !data ? raw : Buffer.concat([data, raw]);
                if (!authenticated) {
                    authenticated = await authenticate(self, data);
                    if (authenticated) {
                        resolve(true);
                    }
                } else {
                    this.log('warn', `Received unexpected data: ${data}`);
                }
            })
            .on('ready', async () => {
                this.log('info', `Successfully connected to ${connectOptions.host}:${connectOptions.port}`);
                if (self.jwk) {
                    this.log('info', `Authenticating with ${connectOptions.host}:${connectOptions.port}`);
                    await self.socket.write(`${self.jwk.kid}\n`, err => {
                        if (err) {
                            reject(err);
                        }
                    });
                } else {
                    authenticated = true;
                    if (!self.secure || !self.tlsVerify) {
                        resolve(true);
                    }
                }
            })
            .on('error', err => {
                self.log('error', err);
                if (err.code !== 'SELF_SIGNED_CERT_IN_CHAIN' || self.tlsVerify) {
                    reject(err);
                }
            });
        });
    }

    /**
     * @ignore
     * @return {http.Agent} Returns the default http agent.
     */
    getDefaultHttpAgent() {
        if (!Sender.DEFAULT_HTTP_AGENT) {
            Sender.DEFAULT_HTTP_AGENT = new http.Agent(DEFAULT_HTTP_AGENT_CONFIG);
        }
        return Sender.DEFAULT_HTTP_AGENT;
    }

    /**
     * @ignore
     * @return {https.Agent} Returns the default https agent.
     */
    getDefaultHttpsAgent() {
        if (!Sender.DEFAULT_HTTPS_AGENT) {
            Sender.DEFAULT_HTTPS_AGENT = new https.Agent(DEFAULT_HTTP_AGENT_CONFIG);
        }
        return Sender.DEFAULT_HTTPS_AGENT;
    }

    /**
     * Closes the TCP connection to the database. <br>
     * Data sitting in the Sender's buffer will be lost unless flush() is called before close().
     */
    async close() {
        if (this.socket) {
            const address = this.socket.remoteAddress;
            const port = this.socket.remotePort;
            this.socket.destroy();
            this.socket = null;
            this.log('info', `Connection to ${address}:${port} is closed`);
        }
    }

    /**
     * Sends the buffer's content to the database and compacts the buffer.
     * If the last row is not finished it stays in the sender's buffer.
     *
     * @return {Promise<boolean>} Resolves to true when there was data in the buffer to send.
     */
    async flush() {
        const data = this.toBuffer(this.endOfLastRow);
        if (!data) {
            return false;
        }

        if (this.http) {
            const request = this.secure ? https.request : http.request;
            const options = createRequestOptions(this, data);
            return sendHttp(this, request, options, data, this.retryTimeout);
        } else {
            if (!this.socket) {
                throw new Error('Sender is not connected');
            }
            return sendTcp(this, data);
        }
    }

    /**
     * @ignore
     * @return {Buffer} Returns a cropped buffer ready to send to the server or null if there is nothing to send.
     * The returned buffer is backed by the sender's buffer.
     */
    toBufferView(pos = this.position) {
        return pos > 0
            ? this.buffer.subarray(0, pos)
            : null;
    }

    /**
     * @ignore
     * @return {Buffer} Returns a cropped buffer ready to send to the server or null if there is nothing to send.
     * The returned buffer is a copy of the sender's buffer.
     */
    toBufferNew(pos = this.position) {
        if (pos > 0) {
            const data = Buffer.allocUnsafe(pos);
            this.buffer.copy(data, 0, 0, pos);
            compact(this);
            return data;
        }
        return null;
    }

    /**
     * Write the table name into the buffer of the sender.
     *
     * @param {string} table - Table name.
     * @return {Sender} Returns with a reference to this sender.
     */
    table(table) {
        if (typeof table !== 'string') {
            throw new Error(`Table name must be a string, received ${typeof table}`);
        }
        if (this.hasTable) {
            throw new Error('Table name has already been set');
        }
        validateTableName(table, this.maxNameLength);
        checkCapacity(this, [table]);
        writeEscaped(this, table);
        this.hasTable = true;
        return this;
    }

    /**
     * Write a symbol name and value into the buffer of the sender.
     *
     * @param {string} name - Symbol name.
     * @param {any} value - Symbol value, toString() will be called to extract the actual symbol value from the parameter.
     * @return {Sender} Returns with a reference to this sender.
     */
    symbol(name, value) {
        if (typeof name !== 'string') {
            throw new Error(`Symbol name must be a string, received ${typeof name}`);
        }
        if (!this.hasTable || this.hasColumns) {
            throw new Error('Symbol can be added only after table name is set and before any column added');
        }
        const valueStr = value.toString();
        checkCapacity(this, [name, valueStr], 2 + name.length + valueStr.length);
        write(this, ',');
        validateColumnName(name, this.maxNameLength);
        writeEscaped(this, name);
        write(this, '=');
        writeEscaped(this, valueStr);
        this.hasSymbols = true;
        return this;
    }

    /**
     * Write a string column with its value into the buffer of the sender.
     *
     * @param {string} name - Column name.
     * @param {string} value - Column value, accepts only string values.
     * @return {Sender} Returns with a reference to this sender.
     */
    stringColumn(name, value) {
        writeColumn(this, name, value, () => {
            checkCapacity(this, [value], 2 + value.length);
            write(this, '"');
            writeEscaped(this, value, true);
            write(this, '"');
        }, 'string');
        return this;
    }

    /**
     * Write a boolean column with its value into the buffer of the sender.
     *
     * @param {string} name - Column name.
     * @param {boolean} value - Column value, accepts only boolean values.
     * @return {Sender} Returns with a reference to this sender.
     */
    booleanColumn(name, value) {
        writeColumn(this, name, value, () => {
            checkCapacity(this, [], 1);
            write(this, value ? 't' : 'f');
        }, 'boolean');
        return this;
    }

    /**
     * Write a float column with its value into the buffer of the sender.
     *
     * @param {string} name - Column name.
     * @param {number} value - Column value, accepts only number values.
     * @return {Sender} Returns with a reference to this sender.
     */
    floatColumn(name, value) {
        writeColumn(this, name, value, () => {
            const valueStr = value.toString();
            checkCapacity(this, [valueStr], valueStr.length);
            write(this, valueStr);
        }, 'number');
        return this;
    }

    /**
     * Write an integer column with its value into the buffer of the sender.
     *
     * @param {string} name - Column name.
     * @param {number} value - Column value, accepts only number values.
     * @return {Sender} Returns with a reference to this sender.
     */
    intColumn(name, value) {
        if (!Number.isInteger(value)) {
            throw new Error(`Value must be an integer, received ${value}`);
        }
        writeColumn(this, name, value, () => {
            const valueStr = value.toString();
            checkCapacity(this, [valueStr], 1 + valueStr.length);
            write(this, valueStr);
            write(this, 'i');
        });
        return this;
    }

    /**
     * Write a timestamp column with its value into the buffer of the sender.
     *
     * @param {string} name - Column name.
     * @param {number | bigint} value - Epoch timestamp, accepts numbers or BigInts.
     * @param {string} [unit=us] - Timestamp unit. Supported values: 'ns' - nanoseconds, 'us' - microseconds, 'ms' - milliseconds. Defaults to 'us'.
     * @return {Sender} Returns with a reference to this sender.
     */
    timestampColumn(name, value, unit = 'us') {
        if (typeof value !== 'bigint' && !Number.isInteger(value)) {
            throw new Error(`Value must be an integer or BigInt, received ${value}`);
        }
        writeColumn(this, name, value, () => {
            const valueMicros = timestampToMicros(BigInt(value), unit);
            const valueStr = valueMicros.toString();
            checkCapacity(this, [valueStr], 1 + valueStr.length);
            write(this, valueStr);
            write(this, 't');
        });
        return this;
    }

    /**
     * Closing the row after writing the designated timestamp into the buffer of the sender.
     *
     * @param {number | bigint} timestamp - Designated epoch timestamp, accepts numbers or BigInts.
     * @param {string} [unit=us] - Timestamp unit. Supported values: 'ns' - nanoseconds, 'us' - microseconds, 'ms' - milliseconds. Defaults to 'us'.
     */
    async at(timestamp, unit = 'us') {
        if (!this.hasSymbols && !this.hasColumns) {
            throw new Error('The row must have a symbol or column set before it is closed');
        }
        if (typeof timestamp !== 'bigint' && !Number.isInteger(timestamp)) {
            throw new Error(`Designated timestamp must be an integer or BigInt, received ${timestamp}`);
        }
        const timestampNanos = timestampToNanos(BigInt(timestamp), unit);
        const timestampStr = timestampNanos.toString();
        checkCapacity(this, [], 2 + timestampStr.length);
        write(this, ' ');
        write(this, timestampStr);
        write(this, '\n');
        this.pendingRowCount++;
        startNewRow(this);
        await autoFlush(this);
    }

    /**
     * Closing the row without writing designated timestamp into the buffer of the sender. <br>
     * Designated timestamp will be populated by the server on this record.
     */
    async atNow() {
        if (!this.hasSymbols && !this.hasColumns) {
            throw new Error('The row must have a symbol or column set before it is closed');
        }
        checkCapacity(this, [], 1);
        write(this, '\n');
        this.pendingRowCount++;
        startNewRow(this);
        await autoFlush(this);
    }
}

function isBoolean(value) {
    return typeof value === 'boolean';
}

function isInteger(value, lowerBound) {
    return typeof value === 'number' && Number.isInteger(value) && value >= lowerBound;
}

async function authenticate(sender, challenge) {
    // Check for trailing \n which ends the challenge
    if (challenge.slice(-1).readInt8() === 10) {
        const keyObject = await crypto.createPrivateKey(
            {'key': sender.jwk, 'format': 'jwk'}
        );
        const signature = await crypto.sign(
            'RSA-SHA256',
            challenge.slice(0, challenge.length - 1),
            keyObject
        );

        return new Promise((resolve, reject) => {
            sender.socket.write(`${Buffer.from(signature).toString('base64')}\n`, err => {
                err ? reject(err) : resolve(true);
            });
        });
    }
    return false;
}

function startNewRow(sender) {
    sender.endOfLastRow = sender.position;
    sender.hasTable = false;
    sender.hasSymbols = false;
    sender.hasColumns = false;
}

function createRequestOptions(sender, data) {
    const timeoutMillis = (data.length / sender.requestMinThroughput) * 1000 + sender.requestTimeout;
    const options = {
        hostname: sender.host,
        port: sender.port,
        agent: sender.agent,
        path: '/write?precision=n',
        method: 'POST',
        timeout: timeoutMillis
    };
    if (sender.secure) {
        options.rejectUnauthorized = sender.tlsVerify;
        options.ca = sender.tlsCA;
    }
    return options;
}

function sendHttp(sender, request, options, data, retryTimeout, retryBegin = -1, retryInterval = -1) {
    return new Promise((resolve, reject) => {
        let statusCode = -1;
        const req = request(options, response => {
            statusCode = response.statusCode;

            const body = [];
            response
                .on('data', chunk => {
                    body.push(chunk);
                })
                .on('error', err => {
                    sender.log('error', `resp err=${err}`);
                });

            if (statusCode === HTTP_NO_CONTENT) {
                response.on('end', () => {
                    if (body.length > 0) {
                        sender.log('warn', `Unexpected message from server: ${Buffer.concat(body)}`);
                    }
                    sender.doResolve(resolve);
                });
            } else {
                req.destroy(new Error(`HTTP request failed, statusCode=${statusCode}, error=${Buffer.concat(body)}`));
            }
        });

        if (sender.token) {
            req.setHeader('Authorization', 'Bearer ' + sender.token);
        } else if (sender.username && sender.password) {
            req.setHeader('Authorization', 'Basic ' + Buffer.from(sender.username + ':' + sender.password).toString('base64'));
        }

        req.on('timeout', () => {
            // set a retryable error code
            statusCode = 524;
            req.destroy(new Error('HTTP request timeout, no response from server in time'));
        });
        req.on('error', err => {
            // if the error is thrown while the request is sent, statusCode is -1 => no retry
            // request timeout comes through with statusCode 524 => retry
            // if the error is thrown while the response is processed, the statusCode is taken from the response => retry depends on statusCode
            if (isRetryable(statusCode) && retryTimeout > 0) {
                if (retryBegin < 0) {
                    retryBegin = Date.now();
                    retryInterval = 10;
                } else {
                    const elapsed = Date.now() - retryBegin;
                    if (elapsed > retryTimeout) {
                        reject(err);
                        return;
                    }
                }
                const jitter = Math.floor(Math.random() * 10) - 5;
                setTimeout(() => {
                    retryInterval = Math.min(retryInterval * 2, 1000);
                    sendHttp(sender, request, options, data, retryTimeout, retryBegin, retryInterval)
                        .then(() => resolve(true))
                        .catch(e => reject(e));
                }, retryInterval + jitter);
            } else {
                reject(err);
            }
        });
        req.write(data, err => err ? reject(err) : () => {});
        req.end();
    });
}

/*
We are retrying on the following response codes (copied from the Rust client):
500:  Internal Server Error
503:  Service Unavailable
504:  Gateway Timeout

// Unofficial extensions
507:  Insufficient Storage
509:  Bandwidth Limit Exceeded
523:  Origin is Unreachable
524:  A Timeout Occurred
529:  Site is overloaded
599:  Network Connect Timeout Error
*/
function isRetryable(statusCode) {
    return [500, 503, 504, 507, 509, 523, 524, 529, 599].includes(statusCode);
}

async function autoFlush(sender) {
    if (sender.autoFlush && sender.pendingRowCount > 0 && (
        (sender.autoFlushRows > 0 && sender.pendingRowCount >= sender.autoFlushRows) ||
        (sender.autoFlushInterval > 0 && Date.now() - sender.lastFlushTime >= sender.autoFlushInterval)
    )) {
        await sender.flush();
    }
}

function sendTcp(sender, data) {
    return new Promise((resolve, reject) => {
        sender.socket.write(data, err => {
            err ? reject(err) : sender.doResolve(resolve);
        });
    });
}

function checkCapacity(sender, data, base = 0) {
    let length = base;
    for (const str of data) {
        length += Buffer.byteLength(str, 'utf8');
    }
    if (sender.position + length > sender.bufferSize) {
        let newSize = sender.bufferSize;
        do {
            newSize += sender.bufferSize;
        } while(sender.position + length > newSize);
        sender.resize(newSize);
    }
}

function compact(sender) {
    if (sender.endOfLastRow > 0) {
        sender.buffer.copy(sender.buffer, 0, sender.endOfLastRow, sender.position);
        sender.position = sender.position - sender.endOfLastRow;
        sender.endOfLastRow = 0;

        sender.lastFlushTime = Date.now();
        sender.pendingRowCount = 0;
    }
}

function writeColumn(sender, name, value, writeValue, valueType) {
    if (typeof name !== 'string') {
        throw new Error(`Column name must be a string, received ${typeof name}`);
    }
    if (valueType != null && typeof value !== valueType) {
        throw new Error(`Column value must be of type ${valueType}, received ${typeof value}`);
    }
    if (!sender.hasTable) {
        throw new Error('Column can be set only after table name is set');
    }
    checkCapacity(sender, [name], 2 + name.length);
    write(sender, sender.hasColumns ? ',' : ' ');
    validateColumnName(name, sender.maxNameLength);
    writeEscaped(sender, name);
    write(sender, '=');
    writeValue();
    sender.hasColumns = true;
}

function write(sender, data) {
    sender.position += sender.buffer.write(data, sender.position);
    if (sender.position > sender.bufferSize) {
        throw new Error(`Buffer overflow [position=${sender.position}, bufferSize=${sender.bufferSize}]`);
    }
}

function writeEscaped(sender, data, quoted = false) {
    for (const ch of data) {
        if (ch > '\\') {
            write(sender, ch);
            continue;
        }

        switch (ch) {
            case ' ':
            case ',':
            case '=':
                if (!quoted) {
                    write(sender, '\\');
                }
                write(sender, ch);
                break;
            case '\n':
            case '\r':
                write(sender, '\\');
                write(sender, ch);
                break;
            case '"':
                if (quoted) {
                    write(sender, '\\');
                }
                write(sender, ch);
                break;
            case '\\':
                write(sender, '\\\\');
                break;
            default:
                write(sender, ch);
                break;
        }
    }
}

function timestampToMicros(timestamp, unit) {
    switch (unit) {
        case 'ns':
            return timestamp / 1000n;
        case 'us':
            return timestamp;
        case 'ms':
            return timestamp * 1000n;
        default:
            throw new Error('Unknown timestamp unit: ' + unit);
    }
}

function timestampToNanos(timestamp, unit) {
    switch (unit) {
        case 'ns':
            return timestamp;
        case 'us':
            return timestamp * 1000n;
        case 'ms':
            return timestamp * 1000_000n;
        default:
            throw new Error('Unknown timestamp unit: ' + unit);
    }
}

function replaceDeprecatedOptions(options) {
    // deal with deprecated options
    if (options.copyBuffer) {
        options.copy_buffer = options.copyBuffer;
        options.copyBuffer = undefined;
    }
    if (options.bufferSize) {
        options.init_buf_size = options.bufferSize;
        options.bufferSize = undefined;
    }
}

function constructAuth(options) {
    if (!options.username && !options.token && !options.password) {
        // no intention to authenticate
        return;
    }
    if (!options.username || !options.token) {
        throw new Error('TCP transport requires a username and a private key for authentication, ' +
            'please, specify the \'username\' and \'token\' config options');
    }

    options.auth = {
        keyId: options.username,
        token: options.token
    };
}

function constructJwk(options) {
    if (options.auth) {
        if (!options.auth.keyId) {
            throw new Error('Missing username, please, specify the \'keyId\' property of the \'auth\' config option. ' +
                'For example: new Sender({protocol: \'tcp\', host: \'host\', auth: {keyId: \'username\', token: \'private key\'}})');
        }
        if (typeof options.auth.keyId !== 'string') {
            throw new Error('Please, specify the \'keyId\' property of the \'auth\' config option as a string. ' +
                'For example: new Sender({protocol: \'tcp\', host: \'host\', auth: {keyId: \'username\', token: \'private key\'}})');
        }
        if (!options.auth.token) {
            throw new Error('Missing private key, please, specify the \'token\' property of the \'auth\' config option. ' +
                'For example: new Sender({protocol: \'tcp\', host: \'host\', auth: {keyId: \'username\', token: \'private key\'}})');
        }
        if (typeof options.auth.token !== 'string') {
            throw new Error('Please, specify the \'token\' property of the \'auth\' config option as a string. ' +
                'For example: new Sender({protocol: \'tcp\', host: \'host\', auth: {keyId: \'username\', token: \'private key\'}})');
        }

        return {
            kid: options.auth.keyId,
            d: options.auth.token,
            ...PUBLIC_KEY,
            kty: 'EC',
            crv: 'P-256'
        };
    } else {
        return options.jwk;
    }
}

exports.Sender = Sender;
exports.DEFAULT_BUFFER_SIZE = DEFAULT_BUFFER_SIZE;
exports.DEFAULT_MAX_BUFFER_SIZE = DEFAULT_MAX_BUFFER_SIZE;