Skip to content

Commit

Permalink
Merge pull request particle-iot#73 from AntonPuko/webhooks
Browse files Browse the repository at this point in the history
implement base WebhookManager functionality
  • Loading branch information
jlkalberer authored Jan 11, 2017
2 parents 413508e + 9085249 commit 6fed190
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 15 deletions.
29 changes: 19 additions & 10 deletions src/controllers/WebhooksController.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
// @flow

import type {
Repository,
RequestType,
Webhook,
WebhookMutator,
} from '../types';
import type WebhookManager from '../managers/WebhookManager';

import Controller from './Controller';
import HttpError from '../lib/HttpError';
Expand Down Expand Up @@ -34,24 +33,31 @@ const validateWebhookMutator = (webhookMutator: WebhookMutator): ?HttpError => {
};

class WebhooksController extends Controller {
_webhookRepository: Repository<Webhook>;
_webhookManager: WebhookManager;

constructor(webhookRepository: Repository<Webhook>) {
constructor(webhookManager: WebhookManager) {
super();

this._webhookRepository = webhookRepository;
this._webhookManager = webhookManager;
}

@httpVerb('get')
@route('/v1/webhooks')
async getAll(): Promise<*> {
return this.ok(await this._webhookRepository.getAll());
return this.ok(
await this._webhookManager.getAll(this.user.id),
);
}

@httpVerb('get')
@route('/v1/webhooks/:webhookId')
async getById(webhookId: string): Promise<*> {
return this.ok(await this._webhookRepository.getById(webhookId));
return this.ok(
await this._webhookManager.getByID(
webhookId,
this.user.id,
),
);
}

@httpVerb('post')
Expand All @@ -62,7 +68,10 @@ class WebhooksController extends Controller {
throw validateError;
}

const newWebhook = await this._webhookRepository.create(model);
const newWebhook = await this._webhookManager.create({
...model,
ownerID: this.user.id,
});

return this.ok({
created_at: newWebhook.created_at,
Expand All @@ -76,8 +85,8 @@ class WebhooksController extends Controller {
@httpVerb('delete')
@route('/v1/webhooks/:webhookId')
async deleteById(webhookId: string): Promise<*> {
this._webhookRepository.deleteById(webhookId);
return this.ok();
await this._webhookManager.deleteByID(webhookId, this.user.id);
return this.ok({ ok: true });
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/defaultBindings.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import EventsController from './controllers/EventsController';
import ProvisioningController from './controllers/ProvisioningController';
import UsersController from './controllers/UsersController';
import WebhooksController from './controllers/WebhooksController';
import WebhookManager from './managers/WebhookManager';
import EventManager from './managers/EventManager';
import DeviceFirmwareFileRepository from './repository/DeviceFirmwareFileRepository';
import DeviceRepository from './repository/DeviceRepository';
Expand Down Expand Up @@ -61,7 +62,7 @@ export default (container: Container) => {
container.bindClass(
'WebhooksController',
WebhooksController,
Transient.with(['WebhookRepository']),
Transient.with(['WebhookManager']),
);

// managers
Expand All @@ -70,6 +71,11 @@ export default (container: Container) => {
EventManager,
['EventPublisher'],
);
container.bindClass(
'WebhookManager',
WebhookManager,
['WebhookRepository', 'EventPublisher'],
);

// Repositories
container.bindClass(
Expand Down
117 changes: 117 additions & 0 deletions src/managers/WebhookManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// @flow

import type {
Event,
Repository,
Webhook,
WebhookMutator,
} from '../types';
import type { EventPublisher } from 'spark-protocol';

import request from 'request';
import logger from '../lib/logger';
import HttpError from '../lib/HttpError';

class WebhookManager {
_eventPublisher: EventPublisher;
_subscriptionIDsByWebhookID: Map<string, string> = new Map();
_webhookRepository: Repository<Webhook>;

constructor(
webhookRepository: Repository<Webhook>,
eventPublisher: EventPublisher,
) {
this._webhookRepository = webhookRepository;
this._eventPublisher = eventPublisher;

(async (): Promise<void> => this._init())();
}

_init = async (): Promise<void> => {
const allWebhooks = await this._webhookRepository.getAll();
allWebhooks.forEach(
(webhook: Webhook): void => this._subscribeWebhook(webhook),
);
};

_onNewWebhookEvent = (webhook: Webhook): (event: Event) => void =>
(event: Event) => {
try {
const responseHandler = (error, response) => {
if (error) {
// todo block the webhook calls after some amount of fails
// on 1 min or so..
// todo responseTemplates
throw error;
}
};

// todo request <-> webhooks options
request({
body: webhook.json,
formData: webhook.form,
headers: webhook.headers,
json: !!webhook.json,
method: webhook.requestType,
url: webhook.url,
}, responseHandler);
} catch (error) {
logger.error(`webhook error: ${error}`);
}
};

_subscribeWebhook = (webhook: Webhook) => {
const subscriptionID = this._eventPublisher.subscribe(
webhook.event,
this._onNewWebhookEvent(webhook),
// todo separate filtering for MY_DEVICES and for public/private events
{
deviceID: webhook.deviceID,
userID: webhook.ownerID,
},
);
this._subscriptionIDsByWebhookID.set(webhook.id, subscriptionID);
};

_unsubscribeWebhookByID = (webhookID: string) => {
const subscriptionID = this._subscriptionIDsByWebhookID.get(webhookID);
if (!subscriptionID) {
return;
}

this._eventPublisher.unsubscribe(subscriptionID);
this._subscriptionIDsByWebhookID.delete(webhookID);
};

create = async (model: WebhookMutator): Promise<Webhook> => {
const webhook = await this._webhookRepository.create(model);
this._subscribeWebhook(webhook);
return webhook;
};

deleteByID = async (
webhookID: string,
userID: string,
): Promise<void> => {
const webhook = await this._webhookRepository.getById(webhookID, userID);
if (!webhook) {
throw new HttpError('no webhook found', 404);
}
await this._webhookRepository.deleteById(webhookID);
this._unsubscribeWebhookByID(webhookID);
};

getAll = async (userID: string): Promise<Array<Webhook>> =>
await this._webhookRepository.getAll(userID);

getByID = async (webhookID: string, userID: string): Promise<Webhook> => {
const webhook = await this._webhookRepository.getById(webhookID, userID);
if (!webhook) {
throw new HttpError('no webhook found', 404);
}

return webhook;
};
}

export default WebhookManager;
28 changes: 24 additions & 4 deletions src/repository/WebhookFileRepository.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,31 @@ class WebhookFileRepository {
deleteById = async (id: string): Promise<void> =>
this._fileManager.deleteFile(`${id}.json`);

getAll = async (): Promise<Array<Webhook>> =>
this._fileManager.getAllData();
getAll = async (userID: ?string = null): Promise<Array<Webhook>> => {
const allData = this._fileManager.getAllData();

getById = async (id: string): Promise<?Webhook> =>
this._fileManager.getFile(`${id}.json`);
if (userID) {
return allData.filter(
(webhook: Webhook): boolean =>
webhook.ownerID === userID,
);
}
return allData;
};

getById = async (
id: string,
userID: ?string = null,
): Promise<?Webhook> => {
const webhook = this._fileManager.getFile(`${id}.json`);
if (
!webhook ||
webhook.ownerID !== userID
) {
return null;
}
return webhook;
};

update = async (model: WebhookMutator): Promise<Webhook> => {
throw new HttpError('Not implemented');
Expand Down
1 change: 1 addition & 0 deletions src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export type WebhookMutator = {
json?: { [key: string]: Object },
mydevices?: boolean,
noDefaults?: boolean,
ownerID: string,
productIdOrSlug?: string,
query?: { [key: string]: Object },
rejectUnauthorized?: boolean,
Expand Down

0 comments on commit 6fed190

Please sign in to comment.