diff --git a/README.md b/README.md index c93d2fd..333b769 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,34 @@ We aim for full compatibility with the established [sqlite3 API](https://www.npm The package is developed entirely in TypeScript and is fully compatible with JavaScript. It doesn't require any native libraries. This makes it a straightforward and effective tool for managing cloud-based databases in a familiar SQLite environment. +## Publish / Subscribe (Pub/Sub) + +```ts +import { Database } from '@sqlitecloud/drivers' +import { PubSub, PUBSUB_ENTITY_TYPE } from '@sqlitecloud/drivers/lib/drivers/pubsub' + +let database = new Database('sqlitecloud://user:password@xxx.sqlite.cloud:8860/chinook.sqlite') +// or use sqlitecloud://xxx.sqlite.cloud:8860?apikey=xxxxxxx + +const pubSub: PubSub = await database.getPubSub() + +await pubSub.listen(PUBSUB_ENTITY_TYPE.TABLE, 'albums', (error, results, data) => { + if (results) { + // Changes on albums table will be received here as JSON object + console.log('Received message:', results) + } +}) + +await database.sql`INSERT INTO albums (Title, ArtistId) values ('Brand new song', 1)` + +// Stop listening changes on the table +await pubSub.unlisten(PUBSUB_ENTITY_TYPE.TABLE, 'albums') +``` + +Pub/Sub is a messaging pattern that allows multiple applications to communicate with each other asynchronously. In the context of SQLiteCloud, Pub/Sub can be used to provide real-time updates and notifications to subscribed applications whenever data changes in the database or it can be used to send payloads (messages) to anyone subscribed to a channel. + +Pub/Sub Documentation: [https://docs.sqlitecloud.io/docs/pub-sub](https://docs.sqlitecloud.io/docs/pub-sub) + ## More How do I deploy SQLite in the cloud? diff --git a/package.json b/package.json index f42af2b..c24bbea 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@sqlitecloud/drivers", - "version": "1.0.178", + "version": "1.0.193", "description": "SQLiteCloud drivers for Typescript/Javascript in edge, web and node clients", "main": "./lib/index.js", "types": "./lib/index.d.ts", diff --git a/src/drivers/connection-tls.ts b/src/drivers/connection-tls.ts index 4cfbc1f..d9c0035 100644 --- a/src/drivers/connection-tls.ts +++ b/src/drivers/connection-tls.ts @@ -207,8 +207,9 @@ export class SQLiteCloudTlsConnection extends SQLiteCloudConnection { if (this.processCallback) { this.processCallback(error, result) - // this.processCallback = undefined } + + this.buffer = Buffer.alloc(0) } /** Disconnect immediately, release connection, no events. */ diff --git a/src/drivers/database.ts b/src/drivers/database.ts index 7edc2c2..c67bb28 100644 --- a/src/drivers/database.ts +++ b/src/drivers/database.ts @@ -18,6 +18,7 @@ import { Statement } from './statement' import { ErrorCallback, ResultsCallback, RowCallback, RowsCallback } from './types' import EventEmitter from 'eventemitter3' import { isBrowser } from './utilities' +import { PubSub } from './pubsub' // Uses eventemitter3 instead of node events for browser compatibility // https://github.com/primus/eventemitter3 @@ -483,4 +484,24 @@ export class Database extends EventEmitter { }) }) } + + /** + * PubSub class provides a Pub/Sub real-time updates and notifications system to + * allow multiple applications to communicate with each other asynchronously. + * It allows applications to subscribe to tables and receive notifications whenever + * data changes in the database table. It also enables sending messages to anyone + * subscribed to a specific channel. + * @returns {PubSub} A PubSub object + */ + public async getPubSub(): Promise { + return new Promise((resolve, reject) => { + this.getConnection((error, connection) => { + if (error || !connection) { + reject(error) + } else { + resolve(new PubSub(connection)) + } + }) + }) + } } diff --git a/src/drivers/protocol.ts b/src/drivers/protocol.ts index 79750ad..fd45b5f 100644 --- a/src/drivers/protocol.ts +++ b/src/drivers/protocol.ts @@ -25,7 +25,7 @@ export const CMD_COMPRESSED = '%' export const CMD_COMMAND = '^' export const CMD_ARRAY = '=' // const CMD_RAWJSON = '{' -// const CMD_PUBSUB = '|' +export const CMD_PUBSUB = '|' // const CMD_RECONNECT = '@' // To mark the end of the Rowset, the special string /LEN 0 0 0 is sent (LEN is always 6 in this case) @@ -298,6 +298,8 @@ export function popData(buffer: Buffer): { data: SQLiteCloudDataTypes | SQLiteCl return popResults(buffer.subarray(spaceIndex + 1, commandEnd - 1).toString('utf8')) case CMD_COMMAND: return popResults(buffer.subarray(spaceIndex + 1, commandEnd).toString('utf8')) + case CMD_PUBSUB: + return popResults(buffer.subarray(spaceIndex + 1, commandEnd).toString('utf8')) case CMD_JSON: return popResults(JSON.parse(buffer.subarray(spaceIndex + 1, commandEnd).toString('utf8'))) case CMD_BLOB: diff --git a/src/drivers/pubsub.ts b/src/drivers/pubsub.ts new file mode 100644 index 0000000..bb57788 --- /dev/null +++ b/src/drivers/pubsub.ts @@ -0,0 +1,109 @@ +import { SQLiteCloudConnection } from './connection' +import SQLiteCloudTlsConnection from './connection-tls' +import { PubSubCallback } from './types' + +export enum PUBSUB_ENTITY_TYPE { + TABLE = 'TABLE', + CHANNEL = 'CHANNEL' +} + +/** + * Pub/Sub class to receive changes on database tables or to send messages to channels. + */ +export class PubSub { + constructor(connection: SQLiteCloudConnection) { + this.connection = connection + this.connectionPubSub = new SQLiteCloudTlsConnection(connection.getConfig()) + } + + private connection: SQLiteCloudConnection + private connectionPubSub: SQLiteCloudConnection + + /** + * Listen for a table or channel and start to receive messages to the provided callback. + * @param entityType One of TABLE or CHANNEL' + * @param entityName Name of the table or the channel + * @param callback Callback to be called when a message is received + * @param data Extra data to be passed to the callback + */ + public async listen(entityType: PUBSUB_ENTITY_TYPE, entityName: string, callback: PubSubCallback, data?: any): Promise { + const entity = entityType === 'TABLE' ? 'TABLE ' : '' + + const authCommand: string = await this.connection.sql(`LISTEN ${entity}${entityName};`) + + return new Promise((resolve, reject) => { + this.connectionPubSub.sendCommands(authCommand, (error, results) => { + if (error) { + callback.call(this, error, null, data) + reject(error) + } else { + // skip results from pubSub auth command + if (results !== 'OK') { + callback.call(this, null, results, data) + } + resolve(results) + } + }) + }) + } + + /** + * Stop receive messages from a table or channel. + * @param entityType One of TABLE or CHANNEL + * @param entityName Name of the table or the channel + */ + public async unlisten(entityType: string, entityName: string): Promise { + const subject = entityType === 'TABLE' ? 'TABLE ' : '' + + return this.connection.sql(`UNLISTEN ${subject}?;`, entityName) + } + + /** + * Create a channel to send messages to. + * @param name Channel name + * @param failIfExists Raise an error if the channel already exists + */ + public async createChannel(name: string, failIfExists: boolean = true): Promise { + let notExistsCommand = '' + if (!failIfExists) { + notExistsCommand = 'IF NOT EXISTS;' + } + + return this.connection.sql(`CREATE CHANNEL ? ${notExistsCommand}`, name) + } + + /** + * Send a message to the channel. + */ + public notifyChannel(channelName: string, message: string): Promise { + return this.connection.sql`NOTIFY ${channelName} ${message};` + } + + /** + * Ask the server to close the connection to the database and + * to keep only open the Pub/Sub connection. + * Only interaction with Pub/Sub commands will be allowed. + */ + public setPubSubOnly(): Promise { + return new Promise((resolve, reject) => { + this.connection.sendCommands('PUBSUB ONLY;', (error, results) => { + if (error) { + reject(error) + } else { + this.connection.close() + resolve(results) + } + }) + }) + } + + /** True if Pub/Sub connection is open. */ + public connected(): boolean { + return this.connectionPubSub.connected + } + + /** Close Pub/Sub connection. */ + public close(): void { + this.connectionPubSub.close() + } +} diff --git a/src/drivers/types.ts b/src/drivers/types.ts index 0df93a7..68d2a22 100644 --- a/src/drivers/types.ts +++ b/src/drivers/types.ts @@ -129,6 +129,7 @@ export type ResultsCallback = (error: Error | null, results?: T) => voi export type RowsCallback> = (error: Error | null, rows?: T[]) => void export type RowCallback> = (error: Error | null, row?: T) => void export type RowCountCallback = (error: Error | null, rowCount?: number) => void +export type PubSubCallback = (error: Error | null, results?: T, extraData?: T) => void /** * Certain responses include arrays with various types of metadata. diff --git a/test/pubsub.test.ts b/test/pubsub.test.ts new file mode 100644 index 0000000..e7e0369 --- /dev/null +++ b/test/pubsub.test.ts @@ -0,0 +1,211 @@ +import { SQLiteCloudRow } from '../src' +import { getChinookDatabase, LONG_TIMEOUT } from './shared' +import { PUBSUB_ENTITY_TYPE } from '../src/drivers/pubsub' + +describe('pubSub', () => { + it( + 'should listen, notify and receive pubSub messages on channel', + async () => { + const connection = getChinookDatabase() + const pubSub = await connection.getPubSub() + + try { + let callbackCalled = false + const channelName = 'test-channel-' + Math.floor(Math.random() * 999) + const message = 'Message in a bottle ' + Math.floor(Math.random() * 999) + + await pubSub.createChannel(channelName) + + await pubSub.listen( + PUBSUB_ENTITY_TYPE.CHANNEL, + channelName, + (error, results, data) => { + expect(error).toBeNull() + + expect(results).not.toBeNull() + expect(results['channel']).toEqual(channelName) + expect(results['payload']).toEqual(message) + expect(data).toEqual({ pippo: 'pluto' }) + callbackCalled = true + }, + { pippo: 'pluto' } + ) + + await pubSub.notifyChannel(channelName, message) + + while (!callbackCalled) { + await new Promise(resolve => setTimeout(resolve, 1000)) + } + + expect(callbackCalled).toBeTruthy() + } finally { + connection.close() + pubSub.close() + } + }, + LONG_TIMEOUT + ), + it('should unlisten on channel', async () => { + const connection = getChinookDatabase() + const pubSub = await connection.getPubSub() + + try { + const channelName = 'test-channel-' + Math.floor(Math.random() * 999) + + await pubSub.createChannel(channelName) + + await pubSub.listen(PUBSUB_ENTITY_TYPE.CHANNEL, channelName, (error, results, data) => { + expect(true).toBeFalsy() + }) + + let connections = await connection.sql`LIST PUBSUB CONNECTIONS;` + let connectionExists = connections.find((row: SQLiteCloudRow) => row['chname'] === channelName) + expect(connectionExists).toBeDefined() + + await pubSub.unlisten(PUBSUB_ENTITY_TYPE.CHANNEL, channelName) + + connections = await connection.sql`LIST PUBSUB CONNECTIONS;` + connectionExists = connections.find((row: SQLiteCloudRow) => row['chname'] === channelName) + expect(connectionExists).toBeUndefined() + } finally { + connection.close() + pubSub.close() + } + }), + it('should unlisten on table', async () => { + const connection = getChinookDatabase() + const pubSub = await connection.getPubSub() + + try { + let callbackCalled = false + + const tableName = 'genres' + await pubSub.listen(PUBSUB_ENTITY_TYPE.TABLE, tableName, (error, results, data) => { + expect(true).toBeFalsy() + callbackCalled = true + }) + + let connections = await connection.sql`LIST PUBSUB CONNECTIONS;` + let connectionExists = connections.find((row: SQLiteCloudRow) => row['chname'] === tableName) + expect(connectionExists).toBeDefined() + + await pubSub.unlisten(PUBSUB_ENTITY_TYPE.TABLE, tableName) + + await connection.sql`UPDATE genres SET Name = 'Rock' WHERE GenreId = 1` + + // wait a moment to see if the callback is called + await new Promise(resolve => setTimeout(resolve, 2000)) + + expect(callbackCalled).toBeFalsy() + } finally { + connection.close() + pubSub.close() + } + }), + it('should fail to create a channel that already exists', async () => { + const connection = getChinookDatabase() + const pubSub = await connection.getPubSub() + + try { + const channelName = 'test-channel-' + Math.floor(Math.random() * 999) + + await pubSub.createChannel(channelName) + + await expect(pubSub.createChannel(channelName, true)).rejects.toThrow(`Cannot create channel ${channelName} because it already exists.`) + } finally { + connection.close() + pubSub.close() + } + }), + it( + 'should listen and receive pubSub messages on table', + async () => { + const connection = getChinookDatabase() + const pubSub = await connection.getPubSub() + + try { + let callbackCalled = false + const newName = 'Rock' + Math.floor(Math.random() * 999) + + await pubSub.listen( + PUBSUB_ENTITY_TYPE.TABLE, + 'genres', + (error, results, data) => { + expect(error).toBeNull() + + expect(results).not.toBeNull() + expect(results['payload'][0]['type']).toEqual('UPDATE') + expect(results['payload'][0]['Name']).toEqual(newName) + expect(data).toEqual({ pippo: 'pluto' }) + callbackCalled = true + }, + { pippo: 'pluto' } + ) + + await connection.sql`UPDATE genres SET Name = ${newName} WHERE GenreId = 1` + + while (!callbackCalled) { + await new Promise(resolve => setTimeout(resolve, 1000)) + } + + expect(callbackCalled).toBeTruthy() + } finally { + connection.close() + pubSub.close() + } + }, + LONG_TIMEOUT + ), + it('should be connected', async () => { + const connection = getChinookDatabase() + const pubSub = await connection.getPubSub() + + try { + expect(pubSub.connected()).toBeTruthy() + + pubSub.close() + + expect(pubSub.connected()).toBeFalsy() + } finally { + connection.close() + pubSub.close() + } + }), + it( + 'should keep pubSub only connection', + async () => { + const connection = getChinookDatabase() + const connection2 = getChinookDatabase() + const pubSub = await connection.getPubSub() + + try { + let callbackCalled = false + const newName = 'Rock' + Math.floor(Math.random() * 999) + + await pubSub.listen(PUBSUB_ENTITY_TYPE.TABLE, 'genres', (error, results, data) => { + expect(error).toBeNull() + expect(results).not.toBeNull() + callbackCalled = true + }) + + await pubSub.setPubSubOnly() + + expect(connection.sql`SELECT 1`).rejects.toThrow('Connection not established') + expect(pubSub.connected()).toBeTruthy() + + await connection2.sql`UPDATE genres SET Name = ${newName} WHERE GenreId = 1` + + while (!callbackCalled) { + await new Promise(resolve => setTimeout(resolve, 1000)) + } + + expect(callbackCalled).toBeTruthy() + } finally { + connection.close() + pubSub.close() + connection2.close() + } + }, + LONG_TIMEOUT + ) +})