Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,34 @@ We aim for full compatibility with the established [sqlite3 API](https://www.npm

The package is developed entirely in TypeScript and is fully compatible with JavaScript. It doesn't require any native libraries. This makes it a straightforward and effective tool for managing cloud-based databases in a familiar SQLite environment.

## Publish / Subscribe (Pub/Sub)

```ts
import { Database } from '@sqlitecloud/drivers'
import { PubSub, PUBSUB_ENTITY_TYPE } from '@sqlitecloud/drivers/lib/drivers/pubsub'

let database = new Database('sqlitecloud://user:password@xxx.sqlite.cloud:8860/chinook.sqlite')
// or use sqlitecloud://xxx.sqlite.cloud:8860?apikey=xxxxxxx

const pubSub: PubSub = await database.getPubSub()

await pubSub.listen(PUBSUB_ENTITY_TYPE.TABLE, 'albums', (error, results, data) => {
if (results) {
// Changes on albums table will be received here as JSON object
console.log('Received message:', results)
}
})

await database.sql`INSERT INTO albums (Title, ArtistId) values ('Brand new song', 1)`

// Stop listening changes on the table
await pubSub.unlisten(PUBSUB_ENTITY_TYPE.TABLE, 'albums')
```

Pub/Sub is a messaging pattern that allows multiple applications to communicate with each other asynchronously. In the context of SQLiteCloud, Pub/Sub can be used to provide real-time updates and notifications to subscribed applications whenever data changes in the database or it can be used to send payloads (messages) to anyone subscribed to a channel.

Pub/Sub Documentation: [https://docs.sqlitecloud.io/docs/pub-sub](https://docs.sqlitecloud.io/docs/pub-sub)

## More

How do I deploy SQLite in the cloud?
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@sqlitecloud/drivers",
"version": "1.0.178",
"version": "1.0.193",
"description": "SQLiteCloud drivers for Typescript/Javascript in edge, web and node clients",
"main": "./lib/index.js",
"types": "./lib/index.d.ts",
Expand Down
3 changes: 2 additions & 1 deletion src/drivers/connection-tls.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ export class SQLiteCloudTlsConnection extends SQLiteCloudConnection {

if (this.processCallback) {
this.processCallback(error, result)
// this.processCallback = undefined
}

this.buffer = Buffer.alloc(0)
}

/** Disconnect immediately, release connection, no events. */
Expand Down
21 changes: 21 additions & 0 deletions src/drivers/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { Statement } from './statement'
import { ErrorCallback, ResultsCallback, RowCallback, RowsCallback } from './types'
import EventEmitter from 'eventemitter3'
import { isBrowser } from './utilities'
import { PubSub } from './pubsub'

// Uses eventemitter3 instead of node events for browser compatibility
// https://github.com/primus/eventemitter3
Expand Down Expand Up @@ -483,4 +484,24 @@ export class Database extends EventEmitter {
})
})
}

/**
* PubSub class provides a Pub/Sub real-time updates and notifications system to
* allow multiple applications to communicate with each other asynchronously.
* It allows applications to subscribe to tables and receive notifications whenever
* data changes in the database table. It also enables sending messages to anyone
* subscribed to a specific channel.
* @returns {PubSub} A PubSub object
*/
public async getPubSub(): Promise<PubSub> {
return new Promise((resolve, reject) => {
this.getConnection((error, connection) => {
if (error || !connection) {
reject(error)
} else {
resolve(new PubSub(connection))
}
})
})
}
}
4 changes: 3 additions & 1 deletion src/drivers/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export const CMD_COMPRESSED = '%'
export const CMD_COMMAND = '^'
export const CMD_ARRAY = '='
// const CMD_RAWJSON = '{'
// const CMD_PUBSUB = '|'
export const CMD_PUBSUB = '|'
// const CMD_RECONNECT = '@'

// To mark the end of the Rowset, the special string /LEN 0 0 0 is sent (LEN is always 6 in this case)
Expand Down Expand Up @@ -298,6 +298,8 @@ export function popData(buffer: Buffer): { data: SQLiteCloudDataTypes | SQLiteCl
return popResults(buffer.subarray(spaceIndex + 1, commandEnd - 1).toString('utf8'))
case CMD_COMMAND:
return popResults(buffer.subarray(spaceIndex + 1, commandEnd).toString('utf8'))
case CMD_PUBSUB:
return popResults(buffer.subarray(spaceIndex + 1, commandEnd).toString('utf8'))
case CMD_JSON:
return popResults(JSON.parse(buffer.subarray(spaceIndex + 1, commandEnd).toString('utf8')))
case CMD_BLOB:
Expand Down
109 changes: 109 additions & 0 deletions src/drivers/pubsub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { SQLiteCloudConnection } from './connection'
import SQLiteCloudTlsConnection from './connection-tls'
import { PubSubCallback } from './types'

export enum PUBSUB_ENTITY_TYPE {
TABLE = 'TABLE',
CHANNEL = 'CHANNEL'
}

/**
* Pub/Sub class to receive changes on database tables or to send messages to channels.
*/
export class PubSub {
constructor(connection: SQLiteCloudConnection) {
this.connection = connection
this.connectionPubSub = new SQLiteCloudTlsConnection(connection.getConfig())
}

private connection: SQLiteCloudConnection
private connectionPubSub: SQLiteCloudConnection

/**
* Listen for a table or channel and start to receive messages to the provided callback.
* @param entityType One of TABLE or CHANNEL'
* @param entityName Name of the table or the channel
* @param callback Callback to be called when a message is received
* @param data Extra data to be passed to the callback
*/
public async listen(entityType: PUBSUB_ENTITY_TYPE, entityName: string, callback: PubSubCallback, data?: any): Promise<any> {
const entity = entityType === 'TABLE' ? 'TABLE ' : ''

const authCommand: string = await this.connection.sql(`LISTEN ${entity}${entityName};`)

return new Promise((resolve, reject) => {
this.connectionPubSub.sendCommands(authCommand, (error, results) => {
if (error) {
callback.call(this, error, null, data)
reject(error)
} else {
// skip results from pubSub auth command
if (results !== 'OK') {
callback.call(this, null, results, data)
}
resolve(results)
}
})
})
}

/**
* Stop receive messages from a table or channel.
* @param entityType One of TABLE or CHANNEL
* @param entityName Name of the table or the channel
*/
public async unlisten(entityType: string, entityName: string): Promise<any> {
const subject = entityType === 'TABLE' ? 'TABLE ' : ''

return this.connection.sql(`UNLISTEN ${subject}?;`, entityName)
}

/**
* Create a channel to send messages to.
* @param name Channel name
* @param failIfExists Raise an error if the channel already exists
*/
public async createChannel(name: string, failIfExists: boolean = true): Promise<any> {
let notExistsCommand = ''
if (!failIfExists) {
notExistsCommand = 'IF NOT EXISTS;'
}

return this.connection.sql(`CREATE CHANNEL ? ${notExistsCommand}`, name)
}

/**
* Send a message to the channel.
*/
public notifyChannel(channelName: string, message: string): Promise<any> {
return this.connection.sql`NOTIFY ${channelName} ${message};`
}

/**
* Ask the server to close the connection to the database and
* to keep only open the Pub/Sub connection.
* Only interaction with Pub/Sub commands will be allowed.
*/
public setPubSubOnly(): Promise<any> {
return new Promise((resolve, reject) => {
this.connection.sendCommands('PUBSUB ONLY;', (error, results) => {
if (error) {
reject(error)
} else {
this.connection.close()
resolve(results)
}
})
})
}

/** True if Pub/Sub connection is open. */
public connected(): boolean {
return this.connectionPubSub.connected
}

/** Close Pub/Sub connection. */
public close(): void {
this.connectionPubSub.close()
}
}
1 change: 1 addition & 0 deletions src/drivers/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ export type ResultsCallback<T = any> = (error: Error | null, results?: T) => voi
export type RowsCallback<T = Record<string, any>> = (error: Error | null, rows?: T[]) => void
export type RowCallback<T = Record<string, any>> = (error: Error | null, row?: T) => void
export type RowCountCallback = (error: Error | null, rowCount?: number) => void
export type PubSubCallback<T = any> = (error: Error | null, results?: T, extraData?: T) => void

/**
* Certain responses include arrays with various types of metadata.
Expand Down
Loading