diff --git a/src/controllers/WebhooksController.js b/src/controllers/WebhooksController.js index 44f18c48..f22e9ad4 100644 --- a/src/controllers/WebhooksController.js +++ b/src/controllers/WebhooksController.js @@ -1,11 +1,10 @@ // @flow import type { - Repository, RequestType, - Webhook, WebhookMutator, } from '../types'; +import type WebhookManager from '../managers/WebhookManager'; import Controller from './Controller'; import HttpError from '../lib/HttpError'; @@ -34,24 +33,31 @@ const validateWebhookMutator = (webhookMutator: WebhookMutator): ?HttpError => { }; class WebhooksController extends Controller { - _webhookRepository: Repository; + _webhookManager: WebhookManager; - constructor(webhookRepository: Repository) { + constructor(webhookManager: WebhookManager) { super(); - this._webhookRepository = webhookRepository; + this._webhookManager = webhookManager; } @httpVerb('get') @route('/v1/webhooks') async getAll(): Promise<*> { - return this.ok(await this._webhookRepository.getAll()); + return this.ok( + await this._webhookManager.getAll(this.user.id), + ); } @httpVerb('get') @route('/v1/webhooks/:webhookId') async getById(webhookId: string): Promise<*> { - return this.ok(await this._webhookRepository.getById(webhookId)); + return this.ok( + await this._webhookManager.getByID( + webhookId, + this.user.id, + ), + ); } @httpVerb('post') @@ -62,7 +68,10 @@ class WebhooksController extends Controller { throw validateError; } - const newWebhook = await this._webhookRepository.create(model); + const newWebhook = await this._webhookManager.create({ + ...model, + ownerID: this.user.id, + }); return this.ok({ created_at: newWebhook.created_at, @@ -76,8 +85,8 @@ class WebhooksController extends Controller { @httpVerb('delete') @route('/v1/webhooks/:webhookId') async deleteById(webhookId: string): Promise<*> { - this._webhookRepository.deleteById(webhookId); - return this.ok(); + await this._webhookManager.deleteByID(webhookId, this.user.id); + return this.ok({ ok: true }); } } diff --git a/src/defaultBindings.js b/src/defaultBindings.js index b579057c..495b6d53 100644 --- a/src/defaultBindings.js +++ b/src/defaultBindings.js @@ -10,6 +10,7 @@ import EventsController from './controllers/EventsController'; import ProvisioningController from './controllers/ProvisioningController'; import UsersController from './controllers/UsersController'; import WebhooksController from './controllers/WebhooksController'; +import WebhookManager from './managers/WebhookManager'; import EventManager from './managers/EventManager'; import DeviceFirmwareFileRepository from './repository/DeviceFirmwareFileRepository'; import DeviceRepository from './repository/DeviceRepository'; @@ -61,7 +62,7 @@ export default (container: Container) => { container.bindClass( 'WebhooksController', WebhooksController, - Transient.with(['WebhookRepository']), + Transient.with(['WebhookManager']), ); // managers @@ -70,6 +71,11 @@ export default (container: Container) => { EventManager, ['EventPublisher'], ); + container.bindClass( + 'WebhookManager', + WebhookManager, + ['WebhookRepository', 'EventPublisher'], + ); // Repositories container.bindClass( diff --git a/src/managers/WebhookManager.js b/src/managers/WebhookManager.js new file mode 100644 index 00000000..d9afa0f4 --- /dev/null +++ b/src/managers/WebhookManager.js @@ -0,0 +1,117 @@ +// @flow + +import type { + Event, + Repository, + Webhook, + WebhookMutator, +} from '../types'; +import type { EventPublisher } from 'spark-protocol'; + +import request from 'request'; +import logger from '../lib/logger'; +import HttpError from '../lib/HttpError'; + +class WebhookManager { + _eventPublisher: EventPublisher; + _subscriptionIDsByWebhookID: Map = new Map(); + _webhookRepository: Repository; + + constructor( + webhookRepository: Repository, + eventPublisher: EventPublisher, + ) { + this._webhookRepository = webhookRepository; + this._eventPublisher = eventPublisher; + + (async (): Promise => this._init())(); + } + + _init = async (): Promise => { + const allWebhooks = await this._webhookRepository.getAll(); + allWebhooks.forEach( + (webhook: Webhook): void => this._subscribeWebhook(webhook), + ); + }; + + _onNewWebhookEvent = (webhook: Webhook): (event: Event) => void => + (event: Event) => { + try { + const responseHandler = (error, response) => { + if (error) { + // todo block the webhook calls after some amount of fails + // on 1 min or so.. + // todo responseTemplates + throw error; + } + }; + + // todo request <-> webhooks options + request({ + body: webhook.json, + formData: webhook.form, + headers: webhook.headers, + json: !!webhook.json, + method: webhook.requestType, + url: webhook.url, + }, responseHandler); + } catch (error) { + logger.error(`webhook error: ${error}`); + } + }; + + _subscribeWebhook = (webhook: Webhook) => { + const subscriptionID = this._eventPublisher.subscribe( + webhook.event, + this._onNewWebhookEvent(webhook), + // todo separate filtering for MY_DEVICES and for public/private events + { + deviceID: webhook.deviceID, + userID: webhook.ownerID, + }, + ); + this._subscriptionIDsByWebhookID.set(webhook.id, subscriptionID); + }; + + _unsubscribeWebhookByID = (webhookID: string) => { + const subscriptionID = this._subscriptionIDsByWebhookID.get(webhookID); + if (!subscriptionID) { + return; + } + + this._eventPublisher.unsubscribe(subscriptionID); + this._subscriptionIDsByWebhookID.delete(webhookID); + }; + + create = async (model: WebhookMutator): Promise => { + const webhook = await this._webhookRepository.create(model); + this._subscribeWebhook(webhook); + return webhook; + }; + + deleteByID = async ( + webhookID: string, + userID: string, + ): Promise => { + const webhook = await this._webhookRepository.getById(webhookID, userID); + if (!webhook) { + throw new HttpError('no webhook found', 404); + } + await this._webhookRepository.deleteById(webhookID); + this._unsubscribeWebhookByID(webhookID); + }; + + getAll = async (userID: string): Promise> => + await this._webhookRepository.getAll(userID); + + getByID = async (webhookID: string, userID: string): Promise => { + const webhook = await this._webhookRepository.getById(webhookID, userID); + if (!webhook) { + throw new HttpError('no webhook found', 404); + } + + return webhook; + }; +} + +export default WebhookManager; diff --git a/src/repository/WebhookFileRepository.js b/src/repository/WebhookFileRepository.js index 035761d6..fc6af4a2 100644 --- a/src/repository/WebhookFileRepository.js +++ b/src/repository/WebhookFileRepository.js @@ -32,11 +32,31 @@ class WebhookFileRepository { deleteById = async (id: string): Promise => this._fileManager.deleteFile(`${id}.json`); - getAll = async (): Promise> => - this._fileManager.getAllData(); + getAll = async (userID: ?string = null): Promise> => { + const allData = this._fileManager.getAllData(); - getById = async (id: string): Promise => - this._fileManager.getFile(`${id}.json`); + if (userID) { + return allData.filter( + (webhook: Webhook): boolean => + webhook.ownerID === userID, + ); + } + return allData; + }; + + getById = async ( + id: string, + userID: ?string = null, + ): Promise => { + const webhook = this._fileManager.getFile(`${id}.json`); + if ( + !webhook || + webhook.ownerID !== userID + ) { + return null; + } + return webhook; + }; update = async (model: WebhookMutator): Promise => { throw new HttpError('Not implemented'); diff --git a/src/types.js b/src/types.js index 6e8a7664..86386212 100644 --- a/src/types.js +++ b/src/types.js @@ -18,6 +18,7 @@ export type WebhookMutator = { json?: { [key: string]: Object }, mydevices?: boolean, noDefaults?: boolean, + ownerID: string, productIdOrSlug?: string, query?: { [key: string]: Object }, rejectUnauthorized?: boolean,