flow like the river

This commit is contained in:
root 2025-11-07 00:06:12 +01:00
commit 013fe673f3
42435 changed files with 5764238 additions and 0 deletions

View file

@ -0,0 +1,89 @@
/// <reference types="node" />
/// <reference types="node" />
import { URL } from 'url';
import { ConnectionOptions as TlsConnectionOptions } from 'tls';
import Diagnostic from '../Diagnostic';
import { kCaFingerprint } from '../symbols';
import { Connection, ConnectionOptions, BaseConnection } from '../connection';
import { HttpAgentOptions, UndiciAgentOptions, agentFn, ApiKeyAuth, BasicAuth, BearerAuth, nodeFilterFn, nodeSelectorFn } from '../types';
type AddConnectionOptions = string | ConnectionOptions;
export interface ConnectionPoolOptions {
tls?: TlsConnectionOptions;
agent?: HttpAgentOptions | UndiciAgentOptions | agentFn | false;
proxy?: string | URL;
auth?: BasicAuth | ApiKeyAuth | BearerAuth;
diagnostic?: Diagnostic;
Connection: typeof BaseConnection;
pingTimeout?: number;
resurrectStrategy?: 'none' | 'ping' | 'optimistic';
caFingerprint?: string;
}
export interface GetConnectionOptions {
filter?: nodeFilterFn;
selector?: nodeSelectorFn;
now: number;
requestId: string | number;
name: string | symbol;
context: any;
}
export default class BaseConnectionPool {
connections: Connection[];
size: number;
Connection: typeof BaseConnection;
diagnostic: Diagnostic;
auth?: BasicAuth | ApiKeyAuth | BearerAuth;
_agent?: HttpAgentOptions | UndiciAgentOptions | agentFn | false;
_proxy?: string | URL;
_tls?: TlsConnectionOptions;
[kCaFingerprint]?: string;
constructor(opts: ConnectionPoolOptions);
markAlive(connection: Connection): this;
markDead(connection: Connection): this;
getConnection(opts: GetConnectionOptions): Connection | null;
/**
* Creates a new connection instance.
*/
createConnection(opts: string | ConnectionOptions): Connection;
/**
* Adds a new connection to the pool.
*
* @param {object|string} host
* @returns {ConnectionPool}
*/
addConnection(connection: AddConnectionOptions | AddConnectionOptions[]): this;
/**
* Removes a new connection to the pool.
*
* @param {object} connection
* @returns {ConnectionPool}
*/
removeConnection(connection: Connection): this;
/**
* Empties the connection pool.
*
* @returns {ConnectionPool}
*/
empty(): Promise<void>;
/**
* Update the ConnectionPool with new connections.
*
* @param {array} array of connections
* @returns {ConnectionPool}
*/
update(nodes: Array<Connection | ConnectionOptions>): this;
/**
* Transforms the nodes objects to a host object.
*
* @param {object} nodes
* @returns {array} hosts
*/
nodesToHost(nodes: Record<string, any>, protocol: string): ConnectionOptions[];
/**
* Transforms an url string to a host object
*
* @param {string} url
* @returns {object} host
*/
urlToHost(url: string): ConnectionOptions;
}
export {};

View file

@ -0,0 +1,296 @@
"use strict";
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
var _a;
Object.defineProperty(exports, "__esModule", { value: true });
const tslib_1 = require("tslib");
const url_1 = require("url");
const debug_1 = tslib_1.__importDefault(require("debug"));
const Diagnostic_1 = tslib_1.__importDefault(require("../Diagnostic"));
const symbols_1 = require("../symbols");
const connection_1 = require("../connection");
const errors_1 = require("../errors");
const debug = (0, debug_1.default)('elasticsearch');
class BaseConnectionPool {
constructor(opts) {
var _b;
Object.defineProperty(this, "connections", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, "size", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, "Connection", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, "diagnostic", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, "auth", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, "_agent", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, "_proxy", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, "_tls", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, _a, {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
// list of nodes and weights
this.connections = [];
// how many nodes we have in our scheduler
this.size = this.connections.length;
this.Connection = opts.Connection;
this.diagnostic = (_b = opts.diagnostic) !== null && _b !== void 0 ? _b : new Diagnostic_1.default();
this.auth = opts.auth;
this._tls = opts.tls;
this._agent = opts.agent;
this._proxy = opts.proxy;
this[symbols_1.kCaFingerprint] = opts.caFingerprint;
}
markAlive(connection) {
connection.status = connection_1.BaseConnection.statuses.ALIVE;
return this;
}
markDead(connection) {
connection.status = connection_1.BaseConnection.statuses.DEAD;
return this;
}
getConnection(opts) {
throw new errors_1.ConfigurationError('The getConnection method should be implemented by extended classes');
}
/**
* Creates a new connection instance.
*/
createConnection(opts) {
if (typeof opts === 'string') {
opts = this.urlToHost(opts);
}
if (this.auth != null) {
opts.auth = this.auth;
}
else if (opts.url.username !== '' && opts.url.password !== '') {
opts.auth = {
username: decodeURIComponent(opts.url.username),
password: decodeURIComponent(opts.url.password)
};
}
/* istanbul ignore else */
if (opts.tls == null)
opts.tls = this._tls;
/* istanbul ignore else */
if (opts.agent == null)
opts.agent = this._agent;
/* istanbul ignore else */
if (opts.proxy == null)
opts.proxy = this._proxy;
/* istanbul ignore else */
if (opts.diagnostic == null)
opts.diagnostic = this.diagnostic;
/* istanbul ignore else */
if (opts.caFingerprint == null)
opts.caFingerprint = this[symbols_1.kCaFingerprint];
const connection = new this.Connection(opts);
for (const conn of this.connections) {
if (conn.id === connection.id) {
throw new Error(`Connection with id '${connection.id}' is already present`);
}
}
return connection;
}
/**
* Adds a new connection to the pool.
*
* @param {object|string} host
* @returns {ConnectionPool}
*/
addConnection(connection) {
if (Array.isArray(connection)) {
const connections = [];
for (const conn of connection) {
connections.push(this.createConnection(conn));
}
return this.update([...this.connections, ...connections]);
}
else {
return this.update([...this.connections, this.createConnection(connection)]);
}
}
/**
* Removes a new connection to the pool.
*
* @param {object} connection
* @returns {ConnectionPool}
*/
removeConnection(connection) {
debug('Removing connection', connection);
return this.update(this.connections.filter(c => c.id !== connection.id));
}
/**
* Empties the connection pool.
*
* @returns {ConnectionPool}
*/
async empty() {
debug('Emptying the connection pool');
const connections = this.connections;
this.connections = [];
this.size = 0;
for (const connection of connections) {
await connection.close();
}
}
/**
* Update the ConnectionPool with new connections.
*
* @param {array} array of connections
* @returns {ConnectionPool}
*/
update(nodes) {
debug('Updating the connection pool');
const newConnections = [];
const oldConnections = [];
for (const node of nodes) {
// if we already have a given connection in the pool
// we mark it as alive and we do not close the connection
// to avoid socket issues
const connectionById = this.connections.find(c => c.id === node.id);
const connectionByUrl = this.connections.find(c => c.id === node.url.href);
if (connectionById != null) {
debug(`The connection with id '${node.id}' is already present`);
this.markAlive(connectionById);
newConnections.push(connectionById);
// in case the user has passed a single url (or an array of urls),
// the connection id will be the full href; to avoid closing valid connections
// because are not present in the pool, we check also the node url,
// and if is already present we update its id with the ES provided one.
}
else if (connectionByUrl != null) {
connectionByUrl.id = node.id;
this.markAlive(connectionByUrl);
newConnections.push(connectionByUrl);
}
else {
if (node instanceof connection_1.BaseConnection) {
newConnections.push(node);
}
else {
newConnections.push(this.createConnection(node));
}
}
}
const ids = nodes.map(c => c.id);
// remove all the dead connections and old connections
for (const connection of this.connections) {
if (!ids.includes(connection.id)) {
oldConnections.push(connection);
}
}
// close old connections
for (const connection of oldConnections) {
connection.close().catch(/* istanbul ignore next */ () => { });
}
this.connections = newConnections;
this.size = this.connections.length;
return this;
}
/**
* Transforms the nodes objects to a host object.
*
* @param {object} nodes
* @returns {array} hosts
*/
nodesToHost(nodes, protocol) {
const ids = Object.keys(nodes);
const hosts = [];
for (let i = 0, len = ids.length; i < len; i++) {
const node = nodes[ids[i]];
// If there is no protocol in
// the `publish_address` new URL will throw
// the publish_address can have two forms:
// - ip:port
// - hostname/ip:port
// if we encounter the second case, we should
// use the hostname instead of the ip
let address = node.http.publish_address;
const parts = address.split('/');
// the url is in the form of hostname/ip:port
if (parts.length > 1) {
const hostname = parts[0];
const port = parts[1].match(/((?::))(?:[0-9]+)$/g)[0].slice(1);
address = `${hostname}:${port}`;
}
address = address.slice(0, 4) === 'http'
/* istanbul ignore next */
? address
: `${protocol}//${address}`;
hosts.push({
url: new url_1.URL(address),
id: ids[i]
});
}
return hosts;
}
/**
* Transforms an url string to a host object
*
* @param {string} url
* @returns {object} host
*/
urlToHost(url) {
return {
url: new url_1.URL(url)
};
}
}
exports.default = BaseConnectionPool;
_a = symbols_1.kCaFingerprint;
//# sourceMappingURL=BaseConnectionPool.js.map

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,25 @@
import BaseConnectionPool, { ConnectionPoolOptions, GetConnectionOptions } from './BaseConnectionPool';
import { Connection, ConnectionOptions } from '../connection';
export default class CloudConnectionPool extends BaseConnectionPool {
cloudConnection: Connection | null;
constructor(opts: ConnectionPoolOptions);
/**
* Returns the only cloud connection.
*
* @returns {object} connection
*/
getConnection(opts: GetConnectionOptions): Connection | null;
/**
* Empties the connection pool.
*
* @returns {ConnectionPool}
*/
empty(): Promise<void>;
/**
* Update the ConnectionPool with new connections.
*
* @param {array} array of connections
* @returns {ConnectionPool}
*/
update(connections: Array<Connection | ConnectionOptions>): this;
}

View file

@ -0,0 +1,64 @@
"use strict";
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
Object.defineProperty(exports, "__esModule", { value: true });
const tslib_1 = require("tslib");
const BaseConnectionPool_1 = tslib_1.__importDefault(require("./BaseConnectionPool"));
class CloudConnectionPool extends BaseConnectionPool_1.default {
constructor(opts) {
super(opts);
Object.defineProperty(this, "cloudConnection", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
this.cloudConnection = null;
}
/**
* Returns the only cloud connection.
*
* @returns {object} connection
*/
getConnection(opts) {
return this.cloudConnection;
}
/**
* Empties the connection pool.
*
* @returns {ConnectionPool}
*/
async empty() {
await super.empty();
this.cloudConnection = null;
}
/**
* Update the ConnectionPool with new connections.
*
* @param {array} array of connections
* @returns {ConnectionPool}
*/
update(connections) {
super.update(connections);
this.cloudConnection = this.connections[0];
return this;
}
}
exports.default = CloudConnectionPool;
//# sourceMappingURL=CloudConnectionPool.js.map

View file

@ -0,0 +1 @@
{"version":3,"file":"CloudConnectionPool.js","sourceRoot":"","sources":["../../src/pool/CloudConnectionPool.ts"],"names":[],"mappings":";AAAA;;;;;;;;;;;;;;;;;GAiBG;;;AAEH,sFAG6B;AAG7B,MAAqB,mBAAoB,SAAQ,4BAAkB;IAEjE,YAAa,IAA2B;QACtC,KAAK,CAAC,IAAI,CAAC,CAAA;QAFb;;;;;WAAkC;QAGhC,IAAI,CAAC,eAAe,GAAG,IAAI,CAAA;IAC7B,CAAC;IAED;;;;OAIG;IACH,aAAa,CAAE,IAA0B;QACvC,OAAO,IAAI,CAAC,eAAe,CAAA;IAC7B,CAAC;IAED;;;;OAIG;IACH,KAAK,CAAC,KAAK;QACT,MAAM,KAAK,CAAC,KAAK,EAAE,CAAA;QACnB,IAAI,CAAC,eAAe,GAAG,IAAI,CAAA;IAC7B,CAAC;IAED;;;;;OAKG;IACH,MAAM,CAAE,WAAkD;QACxD,KAAK,CAAC,MAAM,CAAC,WAAW,CAAC,CAAA;QACzB,IAAI,CAAC,eAAe,GAAG,IAAI,CAAC,WAAW,CAAC,CAAC,CAAC,CAAA;QAC1C,OAAO,IAAI,CAAA;IACb,CAAC;CACF;AArCD,sCAqCC"}

View file

@ -0,0 +1,77 @@
import BaseConnectionPool, { ConnectionPoolOptions, GetConnectionOptions } from './BaseConnectionPool';
import { Connection, ConnectionOptions } from '../connection';
export interface ResurrectOptions {
now: number;
requestId: string | number;
name: string | symbol;
context: any;
}
export interface ResurrectEvent {
strategy: string;
name: string | symbol;
request: {
id: string;
};
isAlive: boolean;
connection: Connection;
}
export default class ClusterConnectionPool extends BaseConnectionPool {
dead: string[];
resurrectTimeout: number;
resurrectTimeoutCutoff: number;
pingTimeout: number;
resurrectStrategy: number;
static resurrectStrategies: {
none: number;
ping: number;
optimistic: number;
};
constructor(opts: ConnectionPoolOptions);
/**
* Marks a connection as 'alive'.
* If needed removes the connection from the dead list
* and then resets the `deadCount`.
*
* @param {object} connection
*/
markAlive(connection: Connection): this;
/**
* Marks a connection as 'dead'.
* If needed adds the connection to the dead list
* and then increments the `deadCount`.
*
* @param {object} connection
*/
markDead(connection: Connection): this;
/**
* If enabled, tries to resurrect a connection with the given
* resurrect strategy ('ping', 'optimistic', 'none').
*
* @param {object} { now, requestId }
*/
resurrect(opts: ResurrectOptions): void;
/**
* Returns an alive connection if present,
* otherwise returns a dead connection.
* By default it filters the `master` only nodes.
* It uses the selector to choose which
* connection return.
*
* @param {object} options (filter and selector)
* @returns {object|null} connection
*/
getConnection(opts: GetConnectionOptions): Connection | null;
/**
* Empties the connection pool.
*
* @returns {ConnectionPool}
*/
empty(): Promise<void>;
/**
* Update the ConnectionPool with new connections.
*
* @param {array} array of connections
* @returns {ConnectionPool}
*/
update(connections: Array<Connection | ConnectionOptions>): this;
}

View file

@ -0,0 +1,259 @@
"use strict";
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
Object.defineProperty(exports, "__esModule", { value: true });
const tslib_1 = require("tslib");
const BaseConnectionPool_1 = tslib_1.__importDefault(require("./BaseConnectionPool"));
const assert_1 = tslib_1.__importDefault(require("assert"));
const debug_1 = tslib_1.__importDefault(require("debug"));
const connection_1 = require("../connection");
const debug = (0, debug_1.default)('elasticsearch');
class ClusterConnectionPool extends BaseConnectionPool_1.default {
constructor(opts) {
var _a, _b;
super(opts);
Object.defineProperty(this, "dead", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, "resurrectTimeout", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, "resurrectTimeoutCutoff", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, "pingTimeout", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, "resurrectStrategy", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
this.dead = [];
// the resurrect timeout is 60s
this.resurrectTimeout = 1000 * 60;
// number of consecutive failures after which
// the timeout doesn't increase
this.resurrectTimeoutCutoff = 5;
this.pingTimeout = (_a = opts.pingTimeout) !== null && _a !== void 0 ? _a : 3000;
const resurrectStrategy = (_b = opts.resurrectStrategy) !== null && _b !== void 0 ? _b : 'ping';
this.resurrectStrategy = ClusterConnectionPool.resurrectStrategies[resurrectStrategy];
(0, assert_1.default)(this.resurrectStrategy != null, `Invalid resurrection strategy: '${resurrectStrategy}'`);
}
/**
* Marks a connection as 'alive'.
* If needed removes the connection from the dead list
* and then resets the `deadCount`.
*
* @param {object} connection
*/
markAlive(connection) {
const { id } = connection;
debug(`Marking as 'alive' connection '${id}'`);
const index = this.dead.indexOf(id);
if (index > -1)
this.dead.splice(index, 1);
connection.status = connection_1.BaseConnection.statuses.ALIVE;
connection.deadCount = 0;
connection.resurrectTimeout = 0;
return this;
}
/**
* Marks a connection as 'dead'.
* If needed adds the connection to the dead list
* and then increments the `deadCount`.
*
* @param {object} connection
*/
markDead(connection) {
const { id } = connection;
debug(`Marking as 'dead' connection '${id}'`);
if (!this.dead.includes(id)) {
// It might happen that `markDead` is called jsut after
// a pool update, and in such case we will add to the dead
// list a node that no longer exist. The following check verify
// that the connection is still part of the pool before
// marking it as dead.
for (let i = 0; i < this.size; i++) {
if (this.connections[i].id === id) {
this.dead.push(id);
break;
}
}
}
connection.status = connection_1.BaseConnection.statuses.DEAD;
connection.deadCount++;
// resurrectTimeout formula:
// `resurrectTimeout * 2 ** min(deadCount - 1, resurrectTimeoutCutoff)`
connection.resurrectTimeout = Date.now() + this.resurrectTimeout * Math.pow(2, Math.min(connection.deadCount - 1, this.resurrectTimeoutCutoff));
// sort the dead list in ascending order
// based on the resurrectTimeout
this.dead.sort((a, b) => {
const conn1 = this.connections.find(c => c.id === a);
const conn2 = this.connections.find(c => c.id === b);
return conn1.resurrectTimeout - conn2.resurrectTimeout;
});
return this;
}
/**
* If enabled, tries to resurrect a connection with the given
* resurrect strategy ('ping', 'optimistic', 'none').
*
* @param {object} { now, requestId }
*/
resurrect(opts) {
if (this.resurrectStrategy === 0 || this.dead.length === 0) {
debug('Nothing to resurrect');
return;
}
// the dead list is sorted in ascending order based on the timeout
// so the first element will always be the one with the smaller timeout
const connection = this.connections.find(c => c.id === this.dead[0]);
if (opts.now < connection.resurrectTimeout) {
debug('Nothing to resurrect');
return;
}
const { id } = connection;
// ping strategy
if (this.resurrectStrategy === 1) {
connection.request({ method: 'HEAD', path: '/' }, { timeout: this.pingTimeout, requestId: opts.requestId, name: opts.name, context: opts.context })
.then(({ statusCode }) => {
let isAlive = true;
if (statusCode === 502 || statusCode === 503 || statusCode === 504) {
debug(`Resurrect: connection '${id}' is still dead`);
this.markDead(connection);
isAlive = false;
}
else {
debug(`Resurrect: connection '${id}' is now alive`);
this.markAlive(connection);
}
this.diagnostic.emit('resurrect', null, {
strategy: 'ping',
name: opts.name,
request: { id: opts.requestId },
isAlive,
connection
});
})
.catch((err) => {
this.markDead(connection);
this.diagnostic.emit('resurrect', err, {
strategy: 'ping',
name: opts.name,
request: { id: opts.requestId },
isAlive: false,
connection
});
});
// optimistic strategy
}
else {
debug(`Resurrect: optimistic resurrection for connection '${id}'`);
this.dead.splice(this.dead.indexOf(id), 1);
connection.status = connection_1.BaseConnection.statuses.ALIVE;
this.diagnostic.emit('resurrect', null, {
strategy: 'optimistic',
name: opts.name,
request: { id: opts.requestId },
isAlive: true,
connection
});
}
}
/**
* Returns an alive connection if present,
* otherwise returns a dead connection.
* By default it filters the `master` only nodes.
* It uses the selector to choose which
* connection return.
*
* @param {object} options (filter and selector)
* @returns {object|null} connection
*/
getConnection(opts) {
const filter = opts.filter != null ? opts.filter : () => true;
const selector = opts.selector != null ? opts.selector : (c) => c[0];
this.resurrect({
now: opts.now,
requestId: opts.requestId,
name: opts.name,
context: opts.context
});
const noAliveConnections = this.size === this.dead.length;
// TODO: can we cache this?
const connections = [];
for (let i = 0; i < this.size; i++) {
const connection = this.connections[i];
if (noAliveConnections || connection.status === connection_1.BaseConnection.statuses.ALIVE) {
if (filter(connection)) {
connections.push(connection);
}
}
}
if (connections.length === 0)
return null;
return selector(connections);
}
/**
* Empties the connection pool.
*
* @returns {ConnectionPool}
*/
async empty() {
await super.empty();
this.dead = [];
}
/**
* Update the ConnectionPool with new connections.
*
* @param {array} array of connections
* @returns {ConnectionPool}
*/
update(connections) {
super.update(connections);
this.dead = [];
return this;
}
}
exports.default = ClusterConnectionPool;
Object.defineProperty(ClusterConnectionPool, "resurrectStrategies", {
enumerable: true,
configurable: true,
writable: true,
value: {
none: 0,
ping: 1,
optimistic: 2
}
});
//# sourceMappingURL=ClusterConnectionPool.js.map

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,45 @@
import { Connection, ConnectionOptions } from '../connection';
import BaseConnectionPool, { ConnectionPoolOptions, GetConnectionOptions } from './BaseConnectionPool';
export default class WeightedConnectionPool extends BaseConnectionPool {
index: number;
maxWeight: number;
greatestCommonDivisor: number;
currentWeight: number;
constructor(opts: ConnectionPoolOptions);
/**
* Returns a connection, even if the connection might be dead.
*
* @param {object} options (filter)
* @returns {object|null} connection
*/
getConnection(opts: GetConnectionOptions): Connection | null;
/**
* Set the weight of a connection to the maximum value.
* If sniffing is not enabled and there is only
* one node, this method is a noop.
*
* @param {object} connection
*/
markAlive(connection: Connection): this;
/**
* Decreases the connection weight.
* If sniffing is not enabled and there is only
* one node, this method is a noop.
*
* @param {object} connection
*/
markDead(connection: Connection): this;
/**
* Empties the connection pool.
*
* @returns {ConnectionPool}
*/
empty(): Promise<void>;
/**
* Update the ConnectionPool with new connections.
*
* @param {array} array of connections
* @returns {ConnectionPool}
*/
update(connections: Array<Connection | ConnectionOptions>): this;
}

View file

@ -0,0 +1,165 @@
"use strict";
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
Object.defineProperty(exports, "__esModule", { value: true });
const tslib_1 = require("tslib");
const connection_1 = require("../connection");
const BaseConnectionPool_1 = tslib_1.__importDefault(require("./BaseConnectionPool"));
const noFilter = () => true;
class WeightedConnectionPool extends BaseConnectionPool_1.default {
constructor(opts) {
super(opts);
Object.defineProperty(this, "index", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, "maxWeight", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, "greatestCommonDivisor", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
Object.defineProperty(this, "currentWeight", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
});
// index choosen last time
this.index = -1;
// max weight of all nodes
this.maxWeight = 0;
// greatest common divisor of all nodes weights
this.greatestCommonDivisor = 0;
// current weight in scheduling
this.currentWeight = 0;
}
/**
* Returns a connection, even if the connection might be dead.
*
* @param {object} options (filter)
* @returns {object|null} connection
*/
getConnection(opts) {
const filter = opts.filter != null ? opts.filter : noFilter;
// we should be able to find the next node in 1 array scan,
// if we don't, it means that we are in an infinite loop
let counter = 0;
while (counter++ < this.size) {
// 0 <= index < size
this.index = (this.index + 1) % this.size;
if (this.index === 0) {
this.currentWeight = this.currentWeight - this.greatestCommonDivisor;
if (this.currentWeight <= 0) {
this.currentWeight = this.maxWeight;
/* istanbul ignore if */
if (this.currentWeight === 0) {
return null;
}
}
}
const connection = this.connections[this.index];
if (connection.weight >= this.currentWeight && filter(connection)) {
return connection;
}
}
return null;
}
/**
* Set the weight of a connection to the maximum value.
* If sniffing is not enabled and there is only
* one node, this method is a noop.
*
* @param {object} connection
*/
markAlive(connection) {
if (this.size === 1 || connection.status === connection_1.BaseConnection.statuses.ALIVE)
return this;
connection.status = connection_1.BaseConnection.statuses.ALIVE;
connection.deadCount = 0;
connection.weight = Math.round(1000 / this.size);
this.maxWeight = Math.max(...(this.connections.map(c => c.weight)));
this.greatestCommonDivisor = this.connections.map(c => c.weight).reduce(getGreatestCommonDivisor, 0);
return this;
}
/**
* Decreases the connection weight.
* If sniffing is not enabled and there is only
* one node, this method is a noop.
*
* @param {object} connection
*/
markDead(connection) {
if (this.size === 1)
return this;
connection.status = connection_1.BaseConnection.statuses.DEAD;
connection.deadCount++;
connection.weight -= Math.round(Math.pow(Math.log2(connection.weight), connection.deadCount));
/* istanbul ignore if */
if (connection.weight <= 0)
connection.weight = 1;
this.maxWeight = Math.max(...(this.connections.map(c => c.weight)));
this.greatestCommonDivisor = this.connections.map(c => c.weight).reduce(getGreatestCommonDivisor, 0);
return this;
}
/**
* Empties the connection pool.
*
* @returns {ConnectionPool}
*/
async empty() {
await super.empty();
this.maxWeight = 0;
this.greatestCommonDivisor = 0;
this.index = -1;
this.currentWeight = 0;
}
/**
* Update the ConnectionPool with new connections.
*
* @param {array} array of connections
* @returns {ConnectionPool}
*/
update(connections) {
super.update(connections);
this.connections.forEach(connection => {
connection.weight = Math.round(1000 / this.size);
});
this.maxWeight = Math.max(...(this.connections.map(c => c.weight)));
this.greatestCommonDivisor = this.connections.map(c => c.weight).reduce(getGreatestCommonDivisor, 0);
this.index = -1;
this.currentWeight = 0;
return this;
}
}
exports.default = WeightedConnectionPool;
function getGreatestCommonDivisor(a, b) {
if (b === 0)
return a;
return getGreatestCommonDivisor(b, a % b);
}
//# sourceMappingURL=WeightedConnectionPool.js.map

View file

@ -0,0 +1 @@
{"version":3,"file":"WeightedConnectionPool.js","sourceRoot":"","sources":["../../src/pool/WeightedConnectionPool.ts"],"names":[],"mappings":";AAAA;;;;;;;;;;;;;;;;;GAiBG;;;AAEH,8CAA6E;AAC7E,sFAG6B;AAE7B,MAAM,QAAQ,GAAG,GAAY,EAAE,CAAC,IAAI,CAAA;AAEpC,MAAqB,sBAAuB,SAAQ,4BAAkB;IAMpE,YAAa,IAA2B;QACtC,KAAK,CAAC,IAAI,CAAC,CAAA;QANb;;;;;WAAa;QACb;;;;;WAAiB;QACjB;;;;;WAA6B;QAC7B;;;;;WAAqB;QAInB,0BAA0B;QAC1B,IAAI,CAAC,KAAK,GAAG,CAAC,CAAC,CAAA;QACf,0BAA0B;QAC1B,IAAI,CAAC,SAAS,GAAG,CAAC,CAAA;QAClB,+CAA+C;QAC/C,IAAI,CAAC,qBAAqB,GAAG,CAAC,CAAA;QAC9B,+BAA+B;QAC/B,IAAI,CAAC,aAAa,GAAG,CAAC,CAAA;IACxB,CAAC;IAED;;;;;OAKG;IACH,aAAa,CAAE,IAA0B;QACvC,MAAM,MAAM,GAAG,IAAI,CAAC,MAAM,IAAI,IAAI,CAAC,CAAC,CAAC,IAAI,CAAC,MAAM,CAAC,CAAC,CAAC,QAAQ,CAAA;QAC3D,2DAA2D;QAC3D,wDAAwD;QACxD,IAAI,OAAO,GAAG,CAAC,CAAA;QACf,OAAO,OAAO,EAAE,GAAG,IAAI,CAAC,IAAI,EAAE;YAC5B,oBAAoB;YACpB,IAAI,CAAC,KAAK,GAAG,CAAC,IAAI,CAAC,KAAK,GAAG,CAAC,CAAC,GAAG,IAAI,CAAC,IAAI,CAAA;YACzC,IAAI,IAAI,CAAC,KAAK,KAAK,CAAC,EAAE;gBACpB,IAAI,CAAC,aAAa,GAAG,IAAI,CAAC,aAAa,GAAG,IAAI,CAAC,qBAAqB,CAAA;gBACpE,IAAI,IAAI,CAAC,aAAa,IAAI,CAAC,EAAE;oBAC3B,IAAI,CAAC,aAAa,GAAG,IAAI,CAAC,SAAS,CAAA;oBACnC,wBAAwB;oBACxB,IAAI,IAAI,CAAC,aAAa,KAAK,CAAC,EAAE;wBAC5B,OAAO,IAAI,CAAA;qBACZ;iBACF;aACF;YACD,MAAM,UAAU,GAAG,IAAI,CAAC,WAAW,CAAC,IAAI,CAAC,KAAK,CAAC,CAAA;YAC/C,IAAI,UAAU,CAAC,MAAM,IAAI,IAAI,CAAC,aAAa,IAAI,MAAM,CAAC,UAAU,CAAC,EAAE;gBACjE,OAAO,UAAU,CAAA;aAClB;SACF;QACD,OAAO,IAAI,CAAA;IACb,CAAC;IAED;;;;;;OAMG;IACH,SAAS,CAAE,UAAsB;QAC/B,IAAI,IAAI,CAAC,IAAI,KAAK,CAAC,IAAI,UAAU,CAAC,MAAM,KAAK,2BAAc,CAAC,QAAQ,CAAC,KAAK;YAAE,OAAO,IAAI,CAAA;QAEvF,UAAU,CAAC,MAAM,GAAG,2BAAc,CAAC,QAAQ,CAAC,KAAK,CAAA;QACjD,UAAU,CAAC,SAAS,GAAG,CAAC,CAAA;QACxB,UAAU,CAAC,MAAM,GAAG,IAAI,CAAC,KAAK,CAAC,IAAI,GAAG,IAAI,CAAC,IAAI,CAAC,CAAA;QAEhD,IAAI,CAAC,SAAS,GAAG,IAAI,CAAC,GAAG,CAAC,GAAG,CAAC,IAAI,CAAC,WAAW,CAAC,GAAG,CAAC,CAAC,CAAC,EAAE,CAAC,CAAC,CAAC,MAAM,CAAC,CAAC,CAAC,CAAA;QACnE,IAAI,CAAC,qBAAqB,GAAG,IAAI,CAAC,WAAW,CAAC,GAAG,CAAC,CAAC,CAAC,EAAE,CAAC,CAAC,CAAC,MAAM,CAAC,CAAC,MAAM,CAAC,wBAAwB,EAAE,CAAC,CAAC,CAAA;QAEpG,OAAO,IAAI,CAAA;IACb,CAAC;IAED;;;;;;OAMG;IACH,QAAQ,CAAE,UAAsB;QAC9B,IAAI,IAAI,CAAC,IAAI,KAAK,CAAC;YAAE,OAAO,IAAI,CAAA;QAEhC,UAAU,CAAC,MAAM,GAAG,2BAAc,CAAC,QAAQ,CAAC,IAAI,CAAA;QAChD,UAAU,CAAC,SAAS,EAAE,CAAA;QACtB,UAAU,CAAC,MAAM,IAAI,IAAI,CAAC,KAAK,CAAC,IAAI,CAAC,GAAG,CAAC,IAAI,CAAC,IAAI,CAAC,UAAU,CAAC,MAAM,CAAC,EAAE,UAAU,CAAC,SAAS,CAAC,CAAC,CAAA;QAE7F,wBAAwB;QACxB,IAAI,UAAU,CAAC,MAAM,IAAI,CAAC;YAAE,UAAU,CAAC,MAAM,GAAG,CAAC,CAAA;QAEjD,IAAI,CAAC,SAAS,GAAG,IAAI,CAAC,GAAG,CAAC,GAAG,CAAC,IAAI,CAAC,WAAW,CAAC,GAAG,CAAC,CAAC,CAAC,EAAE,CAAC,CAAC,CAAC,MAAM,CAAC,CAAC,CAAC,CAAA;QACnE,IAAI,CAAC,qBAAqB,GAAG,IAAI,CAAC,WAAW,CAAC,GAAG,CAAC,CAAC,CAAC,EAAE,CAAC,CAAC,CAAC,MAAM,CAAC,CAAC,MAAM,CAAC,wBAAwB,EAAE,CAAC,CAAC,CAAA;QAEpG,OAAO,IAAI,CAAA;IACb,CAAC;IAED;;;;OAIG;IACH,KAAK,CAAC,KAAK;QACT,MAAM,KAAK,CAAC,KAAK,EAAE,CAAA;QACnB,IAAI,CAAC,SAAS,GAAG,CAAC,CAAA;QAClB,IAAI,CAAC,qBAAqB,GAAG,CAAC,CAAA;QAC9B,IAAI,CAAC,KAAK,GAAG,CAAC,CAAC,CAAA;QACf,IAAI,CAAC,aAAa,GAAG,CAAC,CAAA;IACxB,CAAC;IAED;;;;;OAKG;IACH,MAAM,CAAE,WAAkD;QACxD,KAAK,CAAC,MAAM,CAAC,WAAW,CAAC,CAAA;QAEzB,IAAI,CAAC,WAAW,CAAC,OAAO,CAAC,UAAU,CAAC,EAAE;YACpC,UAAU,CAAC,MAAM,GAAG,IAAI,CAAC,KAAK,CAAC,IAAI,GAAG,IAAI,CAAC,IAAI,CAAC,CAAA;QAClD,CAAC,CAAC,CAAA;QACF,IAAI,CAAC,SAAS,GAAG,IAAI,CAAC,GAAG,CAAC,GAAG,CAAC,IAAI,CAAC,WAAW,CAAC,GAAG,CAAC,CAAC,CAAC,EAAE,CAAC,CAAC,CAAC,MAAM,CAAC,CAAC,CAAC,CAAA;QACnE,IAAI,CAAC,qBAAqB,GAAG,IAAI,CAAC,WAAW,CAAC,GAAG,CAAC,CAAC,CAAC,EAAE,CAAC,CAAC,CAAC,MAAM,CAAC,CAAC,MAAM,CAAC,wBAAwB,EAAE,CAAC,CAAC,CAAA;QACpG,IAAI,CAAC,KAAK,GAAG,CAAC,CAAC,CAAA;QACf,IAAI,CAAC,aAAa,GAAG,CAAC,CAAA;QAEtB,OAAO,IAAI,CAAA;IACb,CAAC;CACF;AA7HD,yCA6HC;AAED,SAAS,wBAAwB,CAAE,CAAS,EAAE,CAAS;IACrD,IAAI,CAAC,KAAK,CAAC;QAAE,OAAO,CAAC,CAAA;IACrB,OAAO,wBAAwB,CAAC,CAAC,EAAE,CAAC,GAAG,CAAC,CAAC,CAAA;AAC3C,CAAC"}

View file

@ -0,0 +1,7 @@
import BaseConnectionPool from './BaseConnectionPool';
import WeightedConnectionPool from './WeightedConnectionPool';
import ClusterConnectionPool from './ClusterConnectionPool';
import CloudConnectionPool from './CloudConnectionPool';
export type { ConnectionPoolOptions, GetConnectionOptions } from './BaseConnectionPool';
export type { ResurrectEvent, ResurrectOptions } from './ClusterConnectionPool';
export { BaseConnectionPool, WeightedConnectionPool, ClusterConnectionPool, CloudConnectionPool };

View file

@ -0,0 +1,31 @@
"use strict";
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the 'License'); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
Object.defineProperty(exports, "__esModule", { value: true });
exports.CloudConnectionPool = exports.ClusterConnectionPool = exports.WeightedConnectionPool = exports.BaseConnectionPool = void 0;
const tslib_1 = require("tslib");
const BaseConnectionPool_1 = tslib_1.__importDefault(require("./BaseConnectionPool"));
exports.BaseConnectionPool = BaseConnectionPool_1.default;
const WeightedConnectionPool_1 = tslib_1.__importDefault(require("./WeightedConnectionPool"));
exports.WeightedConnectionPool = WeightedConnectionPool_1.default;
const ClusterConnectionPool_1 = tslib_1.__importDefault(require("./ClusterConnectionPool"));
exports.ClusterConnectionPool = ClusterConnectionPool_1.default;
const CloudConnectionPool_1 = tslib_1.__importDefault(require("./CloudConnectionPool"));
exports.CloudConnectionPool = CloudConnectionPool_1.default;
//# sourceMappingURL=index.js.map

View file

@ -0,0 +1 @@
{"version":3,"file":"index.js","sourceRoot":"","sources":["../../src/pool/index.ts"],"names":[],"mappings":";AAAA;;;;;;;;;;;;;;;;;GAiBG;;;;AAEH,sFAAqD;AAgBnD,6BAhBK,4BAAkB,CAgBL;AAfpB,8FAA6D;AAgB3D,iCAhBK,gCAAsB,CAgBL;AAfxB,4FAA2D;AAgBzD,gCAhBK,+BAAqB,CAgBL;AAfvB,wFAAuD;AAgBrD,8BAhBK,6BAAmB,CAgBL"}