Skip to content
This repository has been archived by the owner on Aug 18, 2018. It is now read-only.

Use http server over local domain socket instead of json-socket #3

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
"@pnpm/logger": "^1.0.0",
"@pnpm/npm-resolver": "^0.3.1",
"@pnpm/tarball-fetcher": "^0.3.1",
"@types/body-parser": "^1.16.8",
"@types/express": "^4.0.39",
"@types/request-promise-native": "^1.0.10",
"@types/tape": "^4.2.31",
"mos": "^2.0.0-alpha.3",
"mos-plugin-readme": "^1.0.4",
Expand All @@ -61,8 +64,12 @@
"@types/json-socket": "^0.1.17",
"@types/node": "^8.0.57",
"@types/uuid": "^3.4.3",
"json-socket": "^0.3.0",
"express": "^4.16.2",
"body-parser": "^1.18.2",
"fastify": "^0.35.7",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't seem like you use fastify in code

"package-store": "^0.12.0",
"request": "^2.83.0",
"request-promise-native": "^1.0.5",
"uuid": "^3.1.0"
}
}
185 changes: 89 additions & 96 deletions src/connectStoreController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,122 +5,115 @@ import {
Resolution,
WantedDependency,
} from '@pnpm/package-requester'
import JsonSocket = require('json-socket')
import net = require('net')

import request = require('request-promise-native')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use node-fetch instead


import {StoreController} from 'package-store'
import uuid = require('uuid')

export default function (
initOpts: object,
initOpts: {
path: string;
},
): Promise<StoreController> {
const socket = new JsonSocket(new net.Socket());
socket.connect(initOpts as any) // tslint:disable-line
const remotePrefix = `http://unix:${initOpts.path}:`

return new Promise((resolve, reject) => {
socket.on('connect', () => {
const waiters = createWaiters()

socket.on('message', (message) => {
if (message.err) {
waiters.reject(message.action, message.err)
} else {
waiters.resolve(message.action, message.body)
}
})

resolve({
close: async () => {
socket.end()
},
prune: async () => {
socket.sendMessage({
action: 'prune',
}, (err) => err && console.error(err))
},
requestPackage: requestPackage.bind(null, socket, waiters),
saveState: async () => {
socket.sendMessage({
action: 'saveState',
}, (err) => err && console.error(err))
},
updateConnections: async (prefix: string, opts: {addDependencies: string[], removeDependencies: string[], prune: boolean}) => {
socket.sendMessage({
action: 'updateConnections',
args: [prefix, opts],
}, (err) => err && console.error(err))
},
})
resolve({
close: async () => { return },
prune: async () => {
await retryRequest({
json: true,
method: 'POST',
url: `${remotePrefix}/prune`,
})
},
requestPackage: requestPackage.bind(null, remotePrefix),
saveState: async () => {
await retryRequest({
json: true,
method: 'POST',
url: `${remotePrefix}/saveState`,
})
},
updateConnections: async (prefix: string, opts: {addDependencies: string[], removeDependencies: string[], prune: boolean}) => {
await retryRequest({
body: {
opts,
prefix,
},
json: true,
method: 'POST',
url: `${remotePrefix}/updateConnections`,
})
},
})
})
}

function createWaiters () {
const waiters = {}
return {
add (id: string) {
waiters[id] = deffered()
return waiters[id].promise
},
resolve (id: string, obj: object) {
if (waiters[id]) {
waiters[id].resolve(obj)
delete waiters[id]
}
},
reject (id: string, err: object) {
if (waiters[id]) {
waiters[id].reject(err)
delete waiters[id]
}
},
}
}

// tslint:disable-next-line
function noop () {}
let inflightCount = 0
let errorCount = 0

function deffered<T> (): {
promise: Promise<T>,
resolve: (v: T) => void,
reject: (err: Error) => void,
} {
let pResolve: (v: T) => void = noop
let pReject: (err: Error) => void = noop
const promise = new Promise<T>((resolve, reject) => {
pResolve = resolve
pReject = reject
})
return {
promise,
reject: pReject,
resolve: pResolve,
function retryRequest (options: any): any { // tslint:disable-line
if (inflightCount > 100) {
return new Promise((resolve, reject) => {
setTimeout(resolve, 100)
}).then(() => {
return retryRequest(options)
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use p-limit

}
inflightCount += 1
return request(options).catch((e) => {
inflightCount -= 1
if (!e.message.startsWith('Error: connect ECONNRESET') && !e.message.startsWith('Error: connect ECONNREFUSED')) {
throw e
}
console.log('again', errorCount++, inflightCount)
return retryRequest(options)
}).then((data) => {
inflightCount -= 1
return data
})
}

function requestPackage (
socket: JsonSocket,
waiters: object,
remotePrefix: string,
wantedDependency: WantedDependency,
options: RequestPackageOptions,
): Promise<PackageResponse> {
const msgId = uuid.v4()

const fetchingManifest = waiters['add'](`manifestResponse:${msgId}`) // tslint:disable-line
const fetchingFiles = waiters['add'](`packageFilesResponse:${msgId}`) // tslint:disable-line
const response = waiters['add'](`packageResponse:${msgId}`) // tslint:disable-line
.then((packageResponse: object) => {
return Object.assign(packageResponse, {
fetchingFiles,
fetchingManifest,
finishing: Promise.all([fetchingManifest, fetchingFiles]).then(() => undefined),
})
return retryRequest({
body: {
msgId,
options,
wantedDependency,
},
json: true,
method: 'POST',
url: `${remotePrefix}/requestPackage`,
})
.then((packageResponse: PackageResponse) => {
const fetchingManifest = retryRequest({
body: {
msgId,
},
json: true,
method: 'POST',
url: `${remotePrefix}/manifestResponse`,
})

socket.sendMessage({
action: 'requestPackage',
args: [wantedDependency, options],
msgId,
}, (err) => err && console.error(err))

return response
const fetchingFiles = retryRequest({
body: {
msgId,
},
json: true,
method: 'POST',
url: `${remotePrefix}/packageFilesResponse`,
})
return Object.assign(packageResponse, {
fetchingFiles,
fetchingManifest,
finishing: Promise.all([fetchingManifest, fetchingFiles]).then(() => undefined),
})
})
}
128 changes: 56 additions & 72 deletions src/createServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,89 +3,73 @@ import {
RequestPackageOptions,
WantedDependency,
} from '@pnpm/package-requester'
import JsonSocket = require('json-socket')
import net = require('net')

import bodyParser = require('body-parser')
import {Request, Response} from 'express';
import express = require('express')
import http = require('http')

import {StoreController} from 'package-store'

export default function (
store: StoreController,
opts: object,
opts: {
path: string;
},
) {
const server = net.createServer()
server.listen(opts)
server.on('connection', (socket) => {
const jsonSocket = new JsonSocket(socket)
const requestPackage = requestPackageWithCtx.bind(null, {jsonSocket, store})
const packageResponses = {}

const app = express()
app.use(bodyParser.json({
limit: '10mb',
}))

app.post('/requestPackage', async (request: Request, response: Response) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of the separate actions, it might be faster to have a single route with a switch inside.

const {msgId, wantedDependency, options} = request.body
if (!packageResponses[msgId]) {
packageResponses[msgId] = await store.requestPackage(wantedDependency, options)
}
response.json(packageResponses[msgId])
})

app.post('/packageFilesResponse', async (request: Request, response: Response) => {
const fetchingFiles = await packageResponses[request.body.msgId].fetchingFiles
delete packageResponses[request.body.msgId].fetchingFiles
garbageCollectResponses(packageResponses, request.body.msgId)
response.json(fetchingFiles)
})

app.post('/manifestResponse', async (request: Request, response: Response) => {
const fetchingManifest = await packageResponses[request.body.msgId].fetchingManifest
delete packageResponses[request.body.msgId].fetchingManifest
garbageCollectResponses(packageResponses, request.body.msgId)
response.json(fetchingManifest)
})

jsonSocket.on('message', async (message) => {
switch (message.action) {
case 'requestPackage': {
await requestPackage(message.msgId, message.args[0], message.args[1])
return
}
case 'prune': {
await store.prune()
return
}
case 'updateConnections': {
await store.updateConnections(message.args[0], message.args[1])
return
}
case 'saveState': {
await store.saveState()
return
}
}
})
app.post('/prune', async (request: Request, response: Response) => {
await store.prune()
response.json('OK')
})

app.post('/saveState', async (request: Request, response: Response) => {
await store.saveState()
response.json('OK')
})

app.post('/updateConnections', async (request: Request, response: Response) => {
await store.updateConnections(request.body.prefix, request.body.opts)
response.json('OK')
})

const listener = app.listen(opts.path)

return {
close: () => server.close(),
close: () => listener.close(() => { return }),
}
}

async function requestPackageWithCtx (
ctx: {
jsonSocket: JsonSocket,
store: StoreController,
},
msgId: string,
wantedDependency: WantedDependency,
options: RequestPackageOptions,
) {
const packageResponse = await ctx.store.requestPackage(wantedDependency, options) // TODO: If this fails, also return the error
ctx.jsonSocket.sendMessage({
action: `packageResponse:${msgId}`,
body: packageResponse,
}, (err) => err && console.error(err))

if (!packageResponse.isLocal) {
packageResponse.fetchingFiles
.then((packageFilesResponse) => {
ctx.jsonSocket.sendMessage({
action: `packageFilesResponse:${msgId}`,
body: packageFilesResponse,
}, (err) => err && console.error(err))
})
.catch((err) => {
ctx.jsonSocket.sendMessage({
action: `packageFilesResponse:${msgId}`,
err,
}, (merr) => merr && console.error(merr))
})

packageResponse.fetchingManifest
.then((manifestResponse) => {
ctx.jsonSocket.sendMessage({
action: `manifestResponse:${msgId}`,
body: manifestResponse,
}, (err) => err && console.error(err))
})
.catch((err) => {
ctx.jsonSocket.sendMessage({
action: `manifestResponse:${msgId}`,
err,
}, (merr) => merr && console.error(merr))
})
function garbageCollectResponses (packageResponses: object, msgId: string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can have 2 separate dictionaries and you won't need this function

if (!packageResponses[msgId].fetchingFiles && !packageResponses[msgId].fetchingManifest) {
delete packageResponses[msgId]
}
}