diff --git a/CHANGELOG.md b/CHANGELOG.md index 12d3bd65dc5..58d02d2ea3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ +- ADD tutorial on [how to start a HTTP replication with a custom server](https://rxdb.info/replication-http.html) - FIX `.count()` broken on key-compression plugin [#5492](https://github.com/pubkey/rxdb/pull/5492) - UPDATE dexie to version `4.0.1-beta.6` [#5469](https://github.com/pubkey/rxdb/pull/5469) diff --git a/README.md b/README.md index 19e06a22355..0033c178c7a 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ RxDB (short for Reactive Database) is a local-first, NoSQL-database for JavaScript Applications like Websites, hybrid Apps, Electron-Apps, Progressive Web Apps, Deno and Node.js. Reactive means that you can not only query the current state, but subscribe to all state changes like the result of a query or even a single field of a document. This is great for UI-based realtime applications in a way that makes it easy to develop and also has great performance benefits but can also be used to create fast backends in Node.js.
- RxDB provides an easy to implement protocol for realtime replication with your existing infrastructure or one of the plugins for GraphQL, CouchDB, Websocket, WebRTC, Firestore, NATS.
+ RxDB provides an easy to implement protocol for realtime replication with your existing infrastructure or one of the plugins for HTTP, GraphQL, CouchDB, Websocket, WebRTC, Firestore, NATS.
RxDB is based on a storage interface that enables you to swap out the underlying storage engine. This increases code reuse because you can use the same database code for different JavaScript environments by just switching out the storage settings.

diff --git a/docs-src/docs/replication-http.md b/docs-src/docs/replication-http.md new file mode 100644 index 00000000000..fe9f009cf75 --- /dev/null +++ b/docs-src/docs/replication-http.md @@ -0,0 +1,286 @@ +--- +title: HTTP Replication +slug: replication-http.html +description: Learn how to establish HTTP replication between RxDB clients and a Node.js Express server for data synchronization. +--- + + +# HTTP Replication from a custom server to RxDB clients + +While RxDB has a range of backend-specific replication plugins (like [GraphQL](./replication-graphql.md) or [Firestore](./replication-firestore.md)), the replication is build in a way to make it very easy to replicate data from a custom server to RxDB clients. + +

+ HTTP replication +

+ +Using **HTTP** as a transport protocol makes it simple to create a compatible backend on top of your existing infrastructure. For events that must be send from the server to to client, we can use [Server Send Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events). + +In this tutorial we will implement a HTTP replication between an RxDB client and a MongoDB express server. You can adapt this for any other backend database technologie like PostgreSQL or even a non-Node.js server like go or java. + +To create a compatible server for replication, we will start a server and implement the correct HTTP routes and replication handlers. We need a push-handler, a pull-handler and for the ongoing changes `pull.stream` we use **Server Send Events**. + +## Setup + +RxDB does not have a specific HTTP-replication plugin because the [replication primitives plugin](./replication.md) is simple enough to start a HTTP replication on top of it. +We import the `replicateRxCollection` function and start the replication from there for a single [RxCollection](./rx-collection.md). + +```ts +// > client.ts +import { replicateRxCollection } from 'rxdb/plugins/replication'; +const replicationState = await replicateRxCollection({ + collection: myRxCollection, + replicationIdentifier: 'my-http-replication', + push: { /* add settings from below */ }, + pull: { /* add settings from below */ } +}); +``` + +On the server side, we start an express server that has a MongoDB connection and serves the HTTP requests of the client. + +```ts +// > server.ts +import { MongoClient } from 'mongodb'; +import express from 'express'; +const mongoClient = new MongoClient('mongodb://localhost:27017/'); +const mongoConnection = await mongoClient.connect(); +const mongoDatabase = mongoConnection.db('myDatabase'); +const mongoCollection = await mongoDatabase.collection('myDocs'); + +const app = express(); +app.use(express.json()); + +/* ... add routes from below */ + +app.listen(80, () => { + console.log(`Example app listening on port 80`) +}); +``` + +## Pull from the server to the client + +First we need to implement the pull handler. This is used by the RxDB replication to fetch all documents writes that happened after a given `checkpoint`. + +The `checkpoint` format is not determined by RxDB, instead the server can use any type of changepoint that can be used to iterate across document writes. Here we will just use a unix timestamp `updatedAt` and a string `id`. + +On the client we add the `pull.handler` to the replication setting. The handler request the correct server url and fetches the documents. + +```ts +// > client.ts +const replicationState = await replicateRxCollection({ + /* ... */ + pull: { + async handler(checkpointOrNull, batchSize){ + const updatedAt = checkpointOrNull ? checkpointOrNull.updatedAt : 0; + const id = checkpointOrNull ? checkpointOrNull.id : ''; + const response = await fetch(`https://localhost/pull?updatedAt=${updatedAt}&id=${id}&limit=${batchSize}`); + const data = await response.json(); + return { + documents: data.documents, + checkpoint: data.checkpoint + }; + } + + } + /* ... */ +}); +``` + +The server responds with an array of document data based on the given checkpoint and a new checkpoint. +Also the server has to respect the batchSize so that RxDB knows when there are no more new documents and the server returns a non-full array. + +```ts +// > server.ts +import { lastOfArray } from 'rxdb/plugins/core'; +app.get('/pull', (req, res) => { + const id = req.query.id; + const updatedAt = parseInt(req.query.updatedAt, 10); + const documents = await mongoCollection.find({ + $or: [ + /** + * Notice that we have to compare the updatedAt AND the id field + * because the updateAt field is not unique and when two documents have + * the same updateAt, we can still "sort" them by their id. + */ + { + updateAt: { $gt: updatedAt } + }, + { + updateAt: { $eq: updatedAt } + id: { $gt: id } + } + ] + }).limit(parseInt(req.query.batchSize, 10)).toArray(); + const newCheckpoint = documents.length === 0 ? { id, updatedAt } : { + id: lastOfArray(documents).id, + updatedAt: lastOfArray(documents).updatedAt + }; + res.setHeader('Content-Type', 'application/json'); + res.end(JSON.stringify({ documents, checkpoint: newCheckpoint })); +}); +``` + + +## Push from the Client to the Server + +To send client side writes to the server, we have to implement the `push.handler`. It gets an array of change rows as input and has to return only the conflicting documents that did not have been written to the server. Each change row contains a `newDocumentState` and an optional `assumedMasterState`. + +```ts +// > client.ts +const replicationState = await replicateRxCollection({ + /* ... */ + push: { + async handler(changeRows){ + const rawResponse = await fetch('https://localhost/push', { + method: 'POST', + headers: { + 'Accept': 'application/json', + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ changeRows }) + }); + const conflictsArray = await rawResponse.json(); + return conflictsArray; + } + } + /* ... */ +}); +``` + +On the server we first have to detect if the `assumedMasterState` is correct for each row. If yes, we have to write the new document state to the database, otherwise we have to return the "real" master state in the conflict array. + +**NOTICE:** For simplicity in this tutorial, we do not use transactions. In reality you should run the full push function inside of a MongoDB transaction to ensure that no other process can mix up the document state while the writes are processed. Also you should call batch operations on MongoDB instead of running the operations for each change row. + +The server also creates an `event` that is emitted to the `pullStream$` which is later used in the [pull.stream$](#pullstream-for-ongoing-changes). + +```ts +// > server.ts +import { lastOfArray } from 'rxdb/plugins/core'; +import { Subject } from 'rxjs'; + +// used in the pull.stream$ below +let lastEventId = 0; +const pullStream$ = new Subject(); + +app.get('/push', (req, res) => { + const changeRows = req.body; + const conflicts = []; + const event = { + id: lastEventId++, + documents: [], + checkpoint: null + }; + for(const changeRow of changeRows){ + const realMasterState = mongoCollection.findOne({id: changeRow.newDocumentState.id}); + if( + realMasterState && !changeRow.assumedMasterState || + ( + realMasterState && changeRow.assumedMasterState && + /* + * For simplicity we detect conflicts on the server by only compare the updateAt value. + * In reality you might want to do a more complex check or do a deep-equal comparison. + */ + realMasterState.updatedAt !== changeRow.assumedMasterState.updatedAt + ) + ) { + // we have a conflict + conflicts.push(realMasterState); + } else { + // no conflict -> write the document + mongoCollection.updateOne( + {id: changeRow.newDocumentState.id}, + changeRow.newDocumentState + ); + event.documents.push(changeRow.newDocumentState); + event.checkpoint = { id: changeRow.newDocumentState.id, updatedAt: changeRow.newDocumentState.updatedAt }; + } + } + if(event.documents.length > 0){ + myPullStream$.next(event); + } + res.setHeader('Content-Type', 'application/json'); + res.end(JSON.stringify(conflicts)); +}); +``` + + +## pullStream$ for ongoing changes + +While the normal pull handler is used when the replication is in [iteration mode](./replication.md#checkpoint-iteration), we also need a stream of ongoing changes when the replication is in [event observation mode](./replication.md#event-observation). +The `pull.stream$` is implemented with server send events that are send from the server to the client. + +The client connects to an url and receives server-send-events that contain all ongoing writes. + +```ts +// > client.ts +import { Subject } from 'rxjs'; +const myPullStream$ = new Subject(); +const eventSource = new EventSource('http://localhost/pullStream', { withCredentials: true }); +evtSource.onmessage = event => { + const eventData = JSON.parse(event.data); + myPullStream$.next({ + documents: eventData.documents, + checkpoint: eventData.checkpoint + }); +}; + +const replicationState = await replicateRxCollection({ + /* ... */ + pull: { + /* ... */ + stream$: myPullStream$.asObservable() + } + /* ... */ +}); +``` + + +On the server we have to implement the `pullStream` route and emit the events. We use the `pullStream$` observable from [above](#push-from-the-client-to-the-server) to fetch all ongoing events and respond them to the client. + +```ts +// > server.ts +app.get('/pullStream', (req, res) => { + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Connection': 'keep-alive', + 'Cache-Control': 'no-cache' + }); + const subscription = pullStream$.subscribe(event => res.write('data: ' + JSON.stringify(event))); + req.on('close', () => subscription.unsubscribe()); +}); +``` + +### pullStream$ RESYNC flag + +In case the client looses the connection, the EventSource will automatically reconnect but there might have been some changes that have been missed out in the meantime. The replication has to be informed that it might have missed events by emitting a `RESYNC` flag from the `pull.stream$`. +The replication will then catch up by switching to the [iteration mode](./replication.md#checkpoint-iteration) until it is in sync with the server again. + +```ts +// > client.ts +eventSource.onerror = () => myPullStream$.next('RESYNC'); +``` + +The purpose of the `RESYNC` flag is to tell the client that "something might have changed" and then the client can react on that information without having to run operations in an interval. + +If your backend is not capable of emitting the actual documents and checkpoint in the pull stream, you could just map all events to the RESYNC flag. This would make the replication work with a slight performance drawback: + +```ts +// > client.ts +import { Subject } from 'rxjs'; +const myPullStream$ = new Subject(); +const eventSource = new EventSource('http://localhost/pullStream', { withCredentials: true }); +evtSource.onmessage = () => myPullStream$.next('RESYNC'); +const replicationState = await replicateRxCollection({ + pull: { + stream$: myPullStream$.asObservable() + } +}); +``` + + + +## Missing implementation details + +Here we only covered the basics of doing a HTTP replication between RxDB clients and a server. We did not cover the following aspects of the implementation: + +- Authentication: To authenticate the client on the server, you might want to send authentication headers with the HTTP requests +- Skip events on the `pull.stream$` for the client that caused the changes to improve performance. diff --git a/docs-src/docs/replication-websocket.md b/docs-src/docs/replication-websocket.md index daa0ce5153a..908c662d21a 100644 --- a/docs-src/docs/replication-websocket.md +++ b/docs-src/docs/replication-websocket.md @@ -8,7 +8,7 @@ slug: replication-websocket.html With the websocket replication plugin, you can spawn a websocket server from a RxDB database in Node.js and replicate with it. -**NOTICE**: The websocket replication plugin does not have any concept for authentication or permission handling. It is designed to create an easy **server-to-server** replication. It is **not** made for client-server replication at the moment. +**NOTICE**: The websocket replication plugin does not have any concept for authentication or permission handling. It is designed to create an easy **server-to-server** replication. It is **not** made for client-server replication. Make a pull request if you need that feature. ## Starting the Websocket Server diff --git a/docs-src/docs/replication.md b/docs-src/docs/replication.md index 1fe37497c41..ddfd2cea466 100644 --- a/docs-src/docs/replication.md +++ b/docs-src/docs/replication.md @@ -8,7 +8,7 @@ slug: replication.html The RxDB replication protocol provides the ability to replicate the database state in **realtime** between the clients and the server. The backend server does not have to be a RxDB instance; you can build a replication with **any infrastructure**. -For example you can replicate with a custom GraphQL endpoint or a http server on top of a PostgreSQL database. +For example you can replicate with a custom GraphQL endpoint or a [http server](./replication-http.md) on top of a PostgreSQL database. The replication is made to support the [Offline-First](http://offlinefirst.org/) paradigm, so that when the client goes offline, the RxDB database can still read and write locally and will continue the replication when the client goes online again. diff --git a/docs-src/sidebars.js b/docs-src/sidebars.js index 2cfe84d057f..b014208ed55 100644 --- a/docs-src/sidebars.js +++ b/docs-src/sidebars.js @@ -57,6 +57,7 @@ const sidebars = { label: '🔄 Replication', items: [ 'replication', + 'replication-http', 'replication-graphql', 'replication-websocket', 'replication-couchdb', diff --git a/docs-src/src/pages/index.tsx b/docs-src/src/pages/index.tsx index 3f31c94af81..7947cf92543 100644 --- a/docs-src/src/pages/index.tsx +++ b/docs-src/src/pages/index.tsx @@ -551,8 +551,8 @@ export default function Home() { endpoint which smoothly integrates with your existing infrastructure. Also you can use the replication primitives plugin to create custom replications over any protocol like{' '} - - REST + + HTTP ,{' '} diff --git a/src/plugins/replication/index.ts b/src/plugins/replication/index.ts index abf547d4913..8ea80c5702b 100644 --- a/src/plugins/replication/index.ts +++ b/src/plugins/replication/index.ts @@ -274,7 +274,7 @@ export class RxReplicationState { return row; }) ); - const useRows = useRowsOrNull.filter(arrayFilterNotEmpty); + const useRows: RxReplicationWriteToMasterRow[] = useRowsOrNull.filter(arrayFilterNotEmpty); let result: WithDeleted[] = null as any; diff --git a/src/types/plugins/replication-graphql.d.ts b/src/types/plugins/replication-graphql.d.ts index fa1515b8f84..a9c090857ad 100644 --- a/src/types/plugins/replication-graphql.d.ts +++ b/src/types/plugins/replication-graphql.d.ts @@ -1,8 +1,11 @@ import { RxReplicationWriteToMasterRow } from '../replication-protocol.ts'; import { ById, MaybePromise } from '../util.ts'; import { - ReplicationOptions, ReplicationPullHandlerResult, - ReplicationPullOptions, ReplicationPushHandlerResult, ReplicationPushOptions + ReplicationOptions, + ReplicationPullHandlerResult, + ReplicationPullOptions, + ReplicationPushHandlerResult, + ReplicationPushOptions } from './replication.ts'; export interface RxGraphQLReplicationQueryBuilderResponseObject {