API Reference Source

lib/dialects/abstract/connection-manager.js

  1. 'use strict';
  2.  
  3. const { Pool, TimeoutError } = require('sequelize-pool');
  4. const _ = require('lodash');
  5. const semver = require('semver');
  6. const Promise = require('../../promise');
  7. const errors = require('../../errors');
  8. const { logger } = require('../../utils/logger');
  9. const debug = logger.debugContext('pool');
  10.  
  11. /**
  12. * Abstract Connection Manager
  13. *
  14. * Connection manager which handles pooling & replication.
  15. * Uses sequelize-pool for pooling
  16. *
  17. * @private
  18. */
  19. class ConnectionManager {
  20. constructor(dialect, sequelize) {
  21. const config = _.cloneDeep(sequelize.config);
  22.  
  23. this.sequelize = sequelize;
  24. this.config = config;
  25. this.dialect = dialect;
  26. this.versionPromise = null;
  27. this.dialectName = this.sequelize.options.dialect;
  28.  
  29. if (config.pool === false) {
  30. throw new Error('Support for pool:false was removed in v4.0');
  31. }
  32.  
  33. config.pool = _.defaults(config.pool || {}, {
  34. max: 5,
  35. min: 0,
  36. idle: 10000,
  37. acquire: 60000,
  38. evict: 1000,
  39. validate: this._validate.bind(this)
  40. });
  41.  
  42. this.initPools();
  43. }
  44.  
  45. refreshTypeParser(dataTypes) {
  46. _.each(dataTypes, dataType => {
  47. if (Object.prototype.hasOwnProperty.call(dataType, 'parse')) {
  48. if (dataType.types[this.dialectName]) {
  49. this._refreshTypeParser(dataType);
  50. } else {
  51. throw new Error(`Parse function not supported for type ${dataType.key} in dialect ${this.dialectName}`);
  52. }
  53. }
  54. });
  55. }
  56.  
  57. /**
  58. * Try to load dialect module from various configured options.
  59. * Priority goes like dialectModulePath > dialectModule > require(default)
  60. *
  61. * @param {string} moduleName Name of dialect module to lookup
  62. *
  63. * @private
  64. * @returns {Object}
  65. */
  66. _loadDialectModule(moduleName) {
  67. try {
  68. if (this.sequelize.config.dialectModulePath) {
  69. return require(this.sequelize.config.dialectModulePath);
  70. }
  71. if (this.sequelize.config.dialectModule) {
  72. return this.sequelize.config.dialectModule;
  73. }
  74. // This is needed so that bundlers (like webpack) know which library to include in the bundle
  75. switch (moduleName) {
  76. case 'pg': return require('pg');
  77. case 'mysql2': return require('mysql2');
  78. case 'mariadb': return require('mariadb');
  79. case 'sqlite3': return require('sqlite3');
  80. case 'tedious': return require('tedious');
  81. default: return require(moduleName);
  82. }
  83. } catch (err) {
  84. if (err.code === 'MODULE_NOT_FOUND') {
  85. if (this.sequelize.config.dialectModulePath) {
  86. throw new Error(`Unable to find dialect at ${this.sequelize.config.dialectModulePath}`);
  87. }
  88. throw new Error(`Please install ${moduleName} package manually`);
  89. }
  90.  
  91. throw err;
  92. }
  93. }
  94.  
  95. /**
  96. * Handler which executes on process exit or connection manager shutdown
  97. *
  98. * @private
  99. * @returns {Promise}
  100. */
  101. _onProcessExit() {
  102. if (!this.pool) {
  103. return Promise.resolve();
  104. }
  105.  
  106. return this.pool.drain().then(() => {
  107. debug('connection drain due to process exit');
  108. return this.pool.destroyAllNow();
  109. });
  110. }
  111.  
  112. /**
  113. * Drain the pool and close it permanently
  114. *
  115. * @returns {Promise}
  116. */
  117. close() {
  118. // Mark close of pool
  119. this.getConnection = function getConnection() {
  120. return Promise.reject(new Error('ConnectionManager.getConnection was called after the connection manager was closed!'));
  121. };
  122.  
  123. return this._onProcessExit();
  124. }
  125.  
  126. /**
  127. * Initialize connection pool. By default pool autostart is set to false, so no connection will be
  128. * be created unless `pool.acquire` is called.
  129. */
  130. initPools() {
  131. const config = this.config;
  132.  
  133. if (!config.replication) {
  134. this.pool = new Pool({
  135. name: 'sequelize',
  136. create: () => this._connect(config),
  137. destroy: connection => {
  138. return this._disconnect(connection)
  139. .tap(() => { debug('connection destroy'); });
  140. },
  141. validate: config.pool.validate,
  142. max: config.pool.max,
  143. min: config.pool.min,
  144. acquireTimeoutMillis: config.pool.acquire,
  145. idleTimeoutMillis: config.pool.idle,
  146. reapIntervalMillis: config.pool.evict
  147. });
  148.  
  149. debug(`pool created with max/min: ${config.pool.max}/${config.pool.min}, no replication`);
  150.  
  151. return;
  152. }
  153.  
  154. if (!Array.isArray(config.replication.read)) {
  155. config.replication.read = [config.replication.read];
  156. }
  157.  
  158. // Map main connection config
  159. config.replication.write = _.defaults(config.replication.write, _.omit(config, 'replication'));
  160.  
  161. // Apply defaults to each read config
  162. config.replication.read = config.replication.read.map(readConfig =>
  163. _.defaults(readConfig, _.omit(this.config, 'replication'))
  164. );
  165.  
  166. // custom pooling for replication (original author @janmeier)
  167. let reads = 0;
  168. this.pool = {
  169. release: client => {
  170. if (client.queryType === 'read') {
  171. this.pool.read.release(client);
  172. } else {
  173. this.pool.write.release(client);
  174. }
  175. },
  176. acquire: (queryType, useMaster) => {
  177. useMaster = useMaster === undefined ? false : useMaster;
  178. if (queryType === 'SELECT' && !useMaster) {
  179. return this.pool.read.acquire();
  180. }
  181. return this.pool.write.acquire();
  182. },
  183. destroy: connection => {
  184. this.pool[connection.queryType].destroy(connection);
  185. debug('connection destroy');
  186. },
  187. destroyAllNow: () => {
  188. return Promise.join(
  189. this.pool.read.destroyAllNow(),
  190. this.pool.write.destroyAllNow()
  191. ).tap(() => { debug('all connections destroyed'); });
  192. },
  193. drain: () => {
  194. return Promise.join(
  195. this.pool.write.drain(),
  196. this.pool.read.drain()
  197. );
  198. },
  199. read: new Pool({
  200. name: 'sequelize:read',
  201. create: () => {
  202. // round robin config
  203. const nextRead = reads++ % config.replication.read.length;
  204. return this._connect(config.replication.read[nextRead]).tap(connection => {
  205. connection.queryType = 'read';
  206. });
  207. },
  208. destroy: connection => this._disconnect(connection),
  209. validate: config.pool.validate,
  210. max: config.pool.max,
  211. min: config.pool.min,
  212. acquireTimeoutMillis: config.pool.acquire,
  213. idleTimeoutMillis: config.pool.idle,
  214. reapIntervalMillis: config.pool.evict
  215. }),
  216. write: new Pool({
  217. name: 'sequelize:write',
  218. create: () => {
  219. return this._connect(config.replication.write).tap(connection => {
  220. connection.queryType = 'write';
  221. });
  222. },
  223. destroy: connection => this._disconnect(connection),
  224. validate: config.pool.validate,
  225. max: config.pool.max,
  226. min: config.pool.min,
  227. acquireTimeoutMillis: config.pool.acquire,
  228. idleTimeoutMillis: config.pool.idle,
  229. reapIntervalMillis: config.pool.evict
  230. })
  231. };
  232.  
  233. debug(`pool created with max/min: ${config.pool.max}/${config.pool.min}, with replication`);
  234. }
  235.  
  236. /**
  237. * Get connection from pool. It sets database version if it's not already set.
  238. * Call pool.acquire to get a connection
  239. *
  240. * @param {Object} [options] Pool options
  241. * @param {string} [options.type] Set which replica to use. Available options are `read` and `write`
  242. * @param {boolean} [options.useMaster=false] Force master or write replica to get connection from
  243. *
  244. * @returns {Promise<Connection>}
  245. */
  246. getConnection(options) {
  247. options = options || {};
  248.  
  249. let promise;
  250. if (this.sequelize.options.databaseVersion === 0) {
  251. if (this.versionPromise) {
  252. promise = this.versionPromise;
  253. } else {
  254. promise = this.versionPromise = this._connect(this.config.replication.write || this.config)
  255. .then(connection => {
  256. const _options = {};
  257.  
  258. _options.transaction = { connection }; // Cheat .query to use our private connection
  259. _options.logging = () => {};
  260. _options.logging.__testLoggingFn = true;
  261.  
  262. //connection might have set databaseVersion value at initialization,
  263. //avoiding a useless round trip
  264. if (this.sequelize.options.databaseVersion === 0) {
  265. return this.sequelize.databaseVersion(_options).then(version => {
  266. const parsedVersion = _.get(semver.coerce(version), 'version') || version;
  267. this.sequelize.options.databaseVersion = semver.valid(parsedVersion)
  268. ? parsedVersion
  269. : this.defaultVersion;
  270. this.versionPromise = null;
  271. return this._disconnect(connection);
  272. });
  273. }
  274.  
  275. this.versionPromise = null;
  276. return this._disconnect(connection);
  277. }).catch(err => {
  278. this.versionPromise = null;
  279. throw err;
  280. });
  281. }
  282. } else {
  283. promise = Promise.resolve();
  284. }
  285.  
  286. return promise.then(() => {
  287. return this.pool.acquire(options.type, options.useMaster)
  288. .catch(error => {
  289. if (error instanceof TimeoutError) throw new errors.ConnectionAcquireTimeoutError(error);
  290. throw error;
  291. });
  292. }).tap(() => { debug('connection acquired'); });
  293. }
  294.  
  295. /**
  296. * Release a pooled connection so it can be utilized by other connection requests
  297. *
  298. * @param {Connection} connection
  299. *
  300. * @returns {Promise}
  301. */
  302. releaseConnection(connection) {
  303. return Promise.try(() => {
  304. this.pool.release(connection);
  305. debug('connection released');
  306. });
  307. }
  308.  
  309. /**
  310. * Call dialect library to get connection
  311. *
  312. * @param {*} config Connection config
  313. * @private
  314. * @returns {Promise<Connection>}
  315. */
  316. _connect(config) {
  317. return this.sequelize.runHooks('beforeConnect', config)
  318. .then(() => this.dialect.connectionManager.connect(config))
  319. .then(connection => this.sequelize.runHooks('afterConnect', connection, config).return(connection));
  320. }
  321.  
  322. /**
  323. * Call dialect library to disconnect a connection
  324. *
  325. * @param {Connection} connection
  326. * @private
  327. * @returns {Promise}
  328. */
  329. _disconnect(connection) {
  330. return this.sequelize.runHooks('beforeDisconnect', connection)
  331. .then(() => this.dialect.connectionManager.disconnect(connection))
  332. .then(() => this.sequelize.runHooks('afterDisconnect', connection));
  333. }
  334.  
  335. /**
  336. * Determine if a connection is still valid or not
  337. *
  338. * @param {Connection} connection
  339. *
  340. * @returns {boolean}
  341. */
  342. _validate(connection) {
  343. if (!this.dialect.connectionManager.validate) {
  344. return true;
  345. }
  346.  
  347. return this.dialect.connectionManager.validate(connection);
  348. }
  349. }
  350.  
  351. module.exports = ConnectionManager;
  352. module.exports.ConnectionManager = ConnectionManager;
  353. module.exports.default = ConnectionManager;