Skip to content

Commit

Permalink
ADD docs for http replication (#5503)
Browse files Browse the repository at this point in the history
* ADD docs for http replication

* ADD info

* ADD changelog
  • Loading branch information
pubkey authored Jan 14, 2024
1 parent 71ba879 commit 8792e31
Show file tree
Hide file tree
Showing 9 changed files with 299 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

<!-- CHANGELOG NEWEST -->

- ADD tutorial on [how to start a HTTP replication with a custom server](https://rxdb.info/replication-http.html)
- FIX `.count()` broken on key-compression plugin [#5492](https://github.com/pubkey/rxdb/pull/5492)
- UPDATE dexie to version `4.0.1-beta.6` [#5469](https://github.com/pubkey/rxdb/pull/5469)

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
RxDB (short for <b>R</b>eactive <b>D</b>ata<b>b</b>ase) is a <a href="https://rxdb.info/offline-first.html">local-first</a>, NoSQL-database for JavaScript Applications like Websites, hybrid Apps, Electron-Apps, Progressive Web Apps, Deno and <a href="https://rxdb.info/nodejs-database.html">Node.js</a>.
Reactive means that you can not only query the current state, but <b>subscribe</b> to all state changes like the result of a query or even a single field of a document.
This is great for UI-based <b>realtime</b> applications in a way that makes it easy to develop and also has great performance benefits but can also be used to create fast backends in Node.js.<br />
RxDB provides an easy to implement <a href="https://rxdb.info/replication.html">protocol</a> for realtime <b>replication</b> with your existing infrastructure or one of the plugins for <a href="https://rxdb.info/replication-graphql.html">GraphQL</a>, <a href="https://rxdb.info/replication-couchdb.html">CouchDB</a>, <a href="https://rxdb.info/replication-websocket.html">Websocket</a>, <a href="https://rxdb.info/replication-webrtc.html">WebRTC</a>, <a href="https://rxdb.info/replication-firestore.html">Firestore</a>, <a href="https://rxdb.info/replication-nats.html">NATS</a>.<br />
RxDB provides an easy to implement <a href="https://rxdb.info/replication.html">protocol</a> for realtime <b>replication</b> with your existing infrastructure or one of the plugins for <a href="https://rxdb.info/replication-http.html">HTTP</a>, <a href="https://rxdb.info/replication-graphql.html">GraphQL</a>, <a href="https://rxdb.info/replication-couchdb.html">CouchDB</a>, <a href="https://rxdb.info/replication-websocket.html">Websocket</a>, <a href="https://rxdb.info/replication-webrtc.html">WebRTC</a>, <a href="https://rxdb.info/replication-firestore.html">Firestore</a>, <a href="https://rxdb.info/replication-nats.html">NATS</a>.<br />
RxDB is based on a storage interface that enables you to swap out the underlying storage engine. This increases <b>code reuse</b> because you can use the same database code for different JavaScript environments by just switching out the storage settings.
</p>

Expand Down
286 changes: 286 additions & 0 deletions docs-src/docs/replication-http.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
---
title: HTTP Replication
slug: replication-http.html
description: Learn how to establish HTTP replication between RxDB clients and a Node.js Express server for data synchronization.
---


# HTTP Replication from a custom server to RxDB clients

While RxDB has a range of backend-specific replication plugins (like [GraphQL](./replication-graphql.md) or [Firestore](./replication-firestore.md)), the replication is build in a way to make it very easy to replicate data from a custom server to RxDB clients.

<p align="center">
<img src="./files/icons/with-gradient/replication.svg" alt="HTTP replication" height="60" />
</p>

Using **HTTP** as a transport protocol makes it simple to create a compatible backend on top of your existing infrastructure. For events that must be send from the server to to client, we can use [Server Send Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events).

In this tutorial we will implement a HTTP replication between an RxDB client and a MongoDB express server. You can adapt this for any other backend database technologie like PostgreSQL or even a non-Node.js server like go or java.

To create a compatible server for replication, we will start a server and implement the correct HTTP routes and replication handlers. We need a push-handler, a pull-handler and for the ongoing changes `pull.stream` we use **Server Send Events**.

## Setup

RxDB does not have a specific HTTP-replication plugin because the [replication primitives plugin](./replication.md) is simple enough to start a HTTP replication on top of it.
We import the `replicateRxCollection` function and start the replication from there for a single [RxCollection](./rx-collection.md).

```ts
// > client.ts
import { replicateRxCollection } from 'rxdb/plugins/replication';
const replicationState = await replicateRxCollection({
collection: myRxCollection,
replicationIdentifier: 'my-http-replication',
push: { /* add settings from below */ },
pull: { /* add settings from below */ }
});
```

On the server side, we start an express server that has a MongoDB connection and serves the HTTP requests of the client.

```ts
// > server.ts
import { MongoClient } from 'mongodb';
import express from 'express';
const mongoClient = new MongoClient('mongodb://localhost:27017/');
const mongoConnection = await mongoClient.connect();
const mongoDatabase = mongoConnection.db('myDatabase');
const mongoCollection = await mongoDatabase.collection('myDocs');

const app = express();
app.use(express.json());

/* ... add routes from below */

app.listen(80, () => {
console.log(`Example app listening on port 80`)
});
```

## Pull from the server to the client

First we need to implement the pull handler. This is used by the RxDB replication to fetch all documents writes that happened after a given `checkpoint`.

The `checkpoint` format is not determined by RxDB, instead the server can use any type of changepoint that can be used to iterate across document writes. Here we will just use a unix timestamp `updatedAt` and a string `id`.

On the client we add the `pull.handler` to the replication setting. The handler request the correct server url and fetches the documents.

```ts
// > client.ts
const replicationState = await replicateRxCollection({
/* ... */
pull: {
async handler(checkpointOrNull, batchSize){
const updatedAt = checkpointOrNull ? checkpointOrNull.updatedAt : 0;
const id = checkpointOrNull ? checkpointOrNull.id : '';
const response = await fetch(`https://localhost/pull?updatedAt=${updatedAt}&id=${id}&limit=${batchSize}`);
const data = await response.json();
return {
documents: data.documents,
checkpoint: data.checkpoint
};
}

}
/* ... */
});
```

The server responds with an array of document data based on the given checkpoint and a new checkpoint.
Also the server has to respect the batchSize so that RxDB knows when there are no more new documents and the server returns a non-full array.

```ts
// > server.ts
import { lastOfArray } from 'rxdb/plugins/core';
app.get('/pull', (req, res) => {
const id = req.query.id;
const updatedAt = parseInt(req.query.updatedAt, 10);
const documents = await mongoCollection.find({
$or: [
/**
* Notice that we have to compare the updatedAt AND the id field
* because the updateAt field is not unique and when two documents have
* the same updateAt, we can still "sort" them by their id.
*/
{
updateAt: { $gt: updatedAt }
},
{
updateAt: { $eq: updatedAt }
id: { $gt: id }
}
]
}).limit(parseInt(req.query.batchSize, 10)).toArray();
const newCheckpoint = documents.length === 0 ? { id, updatedAt } : {
id: lastOfArray(documents).id,
updatedAt: lastOfArray(documents).updatedAt
};
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify({ documents, checkpoint: newCheckpoint }));
});
```


## Push from the Client to the Server

To send client side writes to the server, we have to implement the `push.handler`. It gets an array of change rows as input and has to return only the conflicting documents that did not have been written to the server. Each change row contains a `newDocumentState` and an optional `assumedMasterState`.

```ts
// > client.ts
const replicationState = await replicateRxCollection({
/* ... */
push: {
async handler(changeRows){
const rawResponse = await fetch('https://localhost/push', {
method: 'POST',
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
body: JSON.stringify({ changeRows })
});
const conflictsArray = await rawResponse.json();
return conflictsArray;
}
}
/* ... */
});
```

On the server we first have to detect if the `assumedMasterState` is correct for each row. If yes, we have to write the new document state to the database, otherwise we have to return the "real" master state in the conflict array.

**NOTICE:** For simplicity in this tutorial, we do not use transactions. In reality you should run the full push function inside of a MongoDB transaction to ensure that no other process can mix up the document state while the writes are processed. Also you should call batch operations on MongoDB instead of running the operations for each change row.

The server also creates an `event` that is emitted to the `pullStream$` which is later used in the [pull.stream$](#pullstream-for-ongoing-changes).

```ts
// > server.ts
import { lastOfArray } from 'rxdb/plugins/core';
import { Subject } from 'rxjs';

// used in the pull.stream$ below
let lastEventId = 0;
const pullStream$ = new Subject();

app.get('/push', (req, res) => {
const changeRows = req.body;
const conflicts = [];
const event = {
id: lastEventId++,
documents: [],
checkpoint: null
};
for(const changeRow of changeRows){
const realMasterState = mongoCollection.findOne({id: changeRow.newDocumentState.id});
if(
realMasterState && !changeRow.assumedMasterState ||
(
realMasterState && changeRow.assumedMasterState &&
/*
* For simplicity we detect conflicts on the server by only compare the updateAt value.
* In reality you might want to do a more complex check or do a deep-equal comparison.
*/
realMasterState.updatedAt !== changeRow.assumedMasterState.updatedAt
)
) {
// we have a conflict
conflicts.push(realMasterState);
} else {
// no conflict -> write the document
mongoCollection.updateOne(
{id: changeRow.newDocumentState.id},
changeRow.newDocumentState
);
event.documents.push(changeRow.newDocumentState);
event.checkpoint = { id: changeRow.newDocumentState.id, updatedAt: changeRow.newDocumentState.updatedAt };
}
}
if(event.documents.length > 0){
myPullStream$.next(event);
}
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify(conflicts));
});
```


## pullStream$ for ongoing changes

While the normal pull handler is used when the replication is in [iteration mode](./replication.md#checkpoint-iteration), we also need a stream of ongoing changes when the replication is in [event observation mode](./replication.md#event-observation).
The `pull.stream$` is implemented with server send events that are send from the server to the client.

The client connects to an url and receives server-send-events that contain all ongoing writes.

```ts
// > client.ts
import { Subject } from 'rxjs';
const myPullStream$ = new Subject();
const eventSource = new EventSource('http://localhost/pullStream', { withCredentials: true });
evtSource.onmessage = event => {
const eventData = JSON.parse(event.data);
myPullStream$.next({
documents: eventData.documents,
checkpoint: eventData.checkpoint
});
};

const replicationState = await replicateRxCollection({
/* ... */
pull: {
/* ... */
stream$: myPullStream$.asObservable()
}
/* ... */
});
```


On the server we have to implement the `pullStream` route and emit the events. We use the `pullStream$` observable from [above](#push-from-the-client-to-the-server) to fetch all ongoing events and respond them to the client.

```ts
// > server.ts
app.get('/pullStream', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache'
});
const subscription = pullStream$.subscribe(event => res.write('data: ' + JSON.stringify(event)));
req.on('close', () => subscription.unsubscribe());
});
```

### pullStream$ RESYNC flag

In case the client looses the connection, the EventSource will automatically reconnect but there might have been some changes that have been missed out in the meantime. The replication has to be informed that it might have missed events by emitting a `RESYNC` flag from the `pull.stream$`.
The replication will then catch up by switching to the [iteration mode](./replication.md#checkpoint-iteration) until it is in sync with the server again.

```ts
// > client.ts
eventSource.onerror = () => myPullStream$.next('RESYNC');
```

The purpose of the `RESYNC` flag is to tell the client that "something might have changed" and then the client can react on that information without having to run operations in an interval.

If your backend is not capable of emitting the actual documents and checkpoint in the pull stream, you could just map all events to the RESYNC flag. This would make the replication work with a slight performance drawback:

```ts
// > client.ts
import { Subject } from 'rxjs';
const myPullStream$ = new Subject();
const eventSource = new EventSource('http://localhost/pullStream', { withCredentials: true });
evtSource.onmessage = () => myPullStream$.next('RESYNC');
const replicationState = await replicateRxCollection({
pull: {
stream$: myPullStream$.asObservable()
}
});
```



## Missing implementation details

Here we only covered the basics of doing a HTTP replication between RxDB clients and a server. We did not cover the following aspects of the implementation:

- Authentication: To authenticate the client on the server, you might want to send authentication headers with the HTTP requests
- Skip events on the `pull.stream$` for the client that caused the changes to improve performance.
2 changes: 1 addition & 1 deletion docs-src/docs/replication-websocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ slug: replication-websocket.html
With the websocket replication plugin, you can spawn a websocket server from a RxDB database in Node.js and replicate with it.


**NOTICE**: The websocket replication plugin does not have any concept for authentication or permission handling. It is designed to create an easy **server-to-server** replication. It is **not** made for client-server replication at the moment.
**NOTICE**: The websocket replication plugin does not have any concept for authentication or permission handling. It is designed to create an easy **server-to-server** replication. It is **not** made for client-server replication. Make a pull request if you need that feature.


## Starting the Websocket Server
Expand Down
2 changes: 1 addition & 1 deletion docs-src/docs/replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ slug: replication.html
The RxDB replication protocol provides the ability to replicate the database state in **realtime** between the clients and the server.

The backend server does not have to be a RxDB instance; you can build a replication with **any infrastructure**.
For example you can replicate with a custom GraphQL endpoint or a http server on top of a PostgreSQL database.
For example you can replicate with a custom GraphQL endpoint or a [http server](./replication-http.md) on top of a PostgreSQL database.

The replication is made to support the [Offline-First](http://offlinefirst.org/) paradigm, so that when the client goes offline, the RxDB database can still read and write locally and will continue the replication when the client goes online again.

Expand Down
1 change: 1 addition & 0 deletions docs-src/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const sidebars = {
label: '🔄 Replication',
items: [
'replication',
'replication-http',
'replication-graphql',
'replication-websocket',
'replication-couchdb',
Expand Down
4 changes: 2 additions & 2 deletions docs-src/src/pages/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,8 @@ export default function Home() {
endpoint which smoothly integrates with your existing infrastructure.
Also you can use the replication primitives plugin to create custom
replications over any protocol like{' '}
<a href="/replication.html" target="_blank">
REST
<a href="/replication-http.html" target="_blank">
HTTP
</a>
,{' '}
<a href="/replication-websocket.html" target="_blank">
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/replication/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ export class RxReplicationState<RxDocType, CheckpointType> {
return row;
})
);
const useRows = useRowsOrNull.filter(arrayFilterNotEmpty);
const useRows: RxReplicationWriteToMasterRow<RxDocType>[] = useRowsOrNull.filter(arrayFilterNotEmpty);

let result: WithDeleted<RxDocType>[] = null as any;

Expand Down
7 changes: 5 additions & 2 deletions src/types/plugins/replication-graphql.d.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { RxReplicationWriteToMasterRow } from '../replication-protocol.ts';
import { ById, MaybePromise } from '../util.ts';
import {
ReplicationOptions, ReplicationPullHandlerResult,
ReplicationPullOptions, ReplicationPushHandlerResult, ReplicationPushOptions
ReplicationOptions,
ReplicationPullHandlerResult,
ReplicationPullOptions,
ReplicationPushHandlerResult,
ReplicationPushOptions
} from './replication.ts';

export interface RxGraphQLReplicationQueryBuilderResponseObject {
Expand Down

0 comments on commit 8792e31

Please sign in to comment.