Skip to content
This repository has been archived by the owner on Aug 18, 2018. It is now read-only.

Use http server over local domain socket instead of json-socket #3

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
"@pnpm/logger": "^1.0.0",
"@pnpm/npm-resolver": "^0.3.1",
"@pnpm/tarball-fetcher": "^0.3.1",
"@types/got": "^7.1.6",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type dependencies of prod dependencies should be also prod dependencies

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

"@types/node": "^8.0.57",
"@types/p-limit": "^1.1.1",
"@types/tape": "^4.2.31",
"@types/uuid": "^3.4.3",
"mos": "^2.0.0-alpha.3",
"mos-plugin-readme": "^1.0.4",
"package-preview": "^1.0.1",
Expand All @@ -58,10 +62,8 @@
},
"dependencies": {
"@pnpm/package-requester": "^0.5.0",
"@types/json-socket": "^0.1.17",
"@types/node": "^8.0.57",
"@types/uuid": "^3.4.3",
"json-socket": "^0.3.0",
"got": "^8.0.1",
"p-limit": "^1.1.0",
"package-store": "^0.12.0",
"uuid": "^3.1.0"
}
Expand Down
161 changes: 62 additions & 99 deletions src/connectStoreController.ts
Original file line number Diff line number Diff line change
@@ -1,126 +1,89 @@
import {
PackageResponse,
RequestPackageFunction,
RequestPackageOptions,
Resolution,
WantedDependency,
} from '@pnpm/package-requester'
import JsonSocket = require('json-socket')
import net = require('net')

import got = require('got')
import pLimit = require('p-limit')
import {StoreController} from 'package-store'
import uuid = require('uuid')

export default function (
initOpts: object,
initOpts: {
path?: string,
port?: number,
hostname?: string,
},
): Promise<StoreController> {
const socket = new JsonSocket(new net.Socket());
socket.connect(initOpts as any) // tslint:disable-line
let remotePrefix: string
if (initOpts.path) {
remotePrefix = `http://unix:${initOpts.path}:`
} else {
remotePrefix = `http://${initOpts.hostname}:${initOpts.port}`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be fine to pass in the formed path, these manipulations can be done by pnpm

}
const limitedRetryFetch = retryFetch.bind(null, pLimit(100))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make the concurrency configurable


return new Promise((resolve, reject) => {
socket.on('connect', () => {
const waiters = createWaiters()

socket.on('message', (message) => {
if (message.err) {
waiters.reject(message.action, message.err)
} else {
waiters.resolve(message.action, message.body)
}
})

resolve({
close: async () => {
socket.end()
},
prune: async () => {
socket.sendMessage({
action: 'prune',
}, (err) => err && console.error(err))
},
requestPackage: requestPackage.bind(null, socket, waiters),
saveState: async () => {
socket.sendMessage({
action: 'saveState',
}, (err) => err && console.error(err))
},
updateConnections: async (prefix: string, opts: {addDependencies: string[], removeDependencies: string[], prune: boolean}) => {
socket.sendMessage({
action: 'updateConnections',
args: [prefix, opts],
}, (err) => err && console.error(err))
},
})
resolve({
close: async () => { return },
prune: async () => {
await limitedRetryFetch(`${remotePrefix}/prune`, {})
},
requestPackage: requestPackage.bind(null, remotePrefix, limitedRetryFetch),
saveState: async () => {
await limitedRetryFetch(`${remotePrefix}/saveState`, {})
},
updateConnections: async (prefix: string, opts: {addDependencies: string[], removeDependencies: string[], prune: boolean}) => {
await limitedRetryFetch(`${remotePrefix}/updateConnections`, {
opts,
prefix,
})
},
})
})
}

function createWaiters () {
const waiters = {}
return {
add (id: string) {
waiters[id] = deffered()
return waiters[id].promise
},
resolve (id: string, obj: object) {
if (waiters[id]) {
waiters[id].resolve(obj)
delete waiters[id]
}
},
reject (id: string, err: object) {
if (waiters[id]) {
waiters[id].reject(err)
delete waiters[id]
}
},
}
}

// tslint:disable-next-line
function noop () {}

function deffered<T> (): {
promise: Promise<T>,
resolve: (v: T) => void,
reject: (err: Error) => void,
} {
let pResolve: (v: T) => void = noop
let pReject: (err: Error) => void = noop
const promise = new Promise<T>((resolve, reject) => {
pResolve = resolve
pReject = reject
function retryFetch (limit: (fn: () => PromiseLike<object>) => Promise<object>, url: string, body: object): Promise<object> { // tslint:disable-line
return limit(async () => {
const response = await got(url, {
body: JSON.stringify(body),
headers: { 'Content-Type': 'application/json' },
method: 'POST',
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use p-limit

return JSON.parse(response.body)
}).catch((e) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use async/await syntax here with a regular try/catch

also, I think the try/catch should be inside the function and cover just the got() call, as errors by JSON.parse will always be passed through

if (!e.message.startsWith('Error: connect ECONNRESET') && !e.message.startsWith('Error: connect ECONNREFUSED')) {
throw e
}
return retryFetch(limit, url, body)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got() supports retries out of the box, so I guess this logic is no longer needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and is there a way to tell got to retry as many times as it's needed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, by default it reties 2 times
https://www.npmjs.com/package/got#retries

})
return {
promise,
reject: pReject,
resolve: pResolve,
}
}

function requestPackage (
socket: JsonSocket,
waiters: object,
remotePrefix: string,
limitedRetryFetch: (url: string, body: object) => any, // tslint:disable-line
wantedDependency: WantedDependency,
options: RequestPackageOptions,
): Promise<PackageResponse> {
const msgId = uuid.v4()

const fetchingManifest = waiters['add'](`manifestResponse:${msgId}`) // tslint:disable-line
const fetchingFiles = waiters['add'](`packageFilesResponse:${msgId}`) // tslint:disable-line
const response = waiters['add'](`packageResponse:${msgId}`) // tslint:disable-line
.then((packageResponse: object) => {
return Object.assign(packageResponse, {
fetchingFiles,
fetchingManifest,
finishing: Promise.all([fetchingManifest, fetchingFiles]).then(() => undefined),
})
})

socket.sendMessage({
action: 'requestPackage',
args: [wantedDependency, options],
return limitedRetryFetch(`${remotePrefix}/requestPackage`, {
msgId,
}, (err) => err && console.error(err))

return response
options,
wantedDependency,
})
.then((packageResponse: PackageResponse) => {
const fetchingManifest = limitedRetryFetch(`${remotePrefix}/manifestResponse`, {
msgId,
})
const fetchingFiles = limitedRetryFetch(`${remotePrefix}/packageFilesResponse`, {
msgId,
})
return Object.assign(packageResponse, {
fetchingFiles,
fetchingManifest,
finishing: Promise.all([fetchingManifest, fetchingFiles]).then(() => undefined),
})
})
}
143 changes: 68 additions & 75 deletions src/createServer.ts
Original file line number Diff line number Diff line change
@@ -1,91 +1,84 @@
import {
RequestPackageFunction,
RequestPackageOptions,
WantedDependency,
} from '@pnpm/package-requester'
import JsonSocket = require('json-socket')
import net = require('net')
import http = require('http')
import {IncomingMessage, Server, ServerResponse} from 'http'
import {StoreController} from 'package-store'

export default function (
store: StoreController,
opts: object,
opts: {
path?: string,
port?: number,
hostname?: string,
},
) {
const server = net.createServer()
server.listen(opts)
server.on('connection', (socket) => {
const jsonSocket = new JsonSocket(socket)
const requestPackage = requestPackageWithCtx.bind(null, {jsonSocket, store})
const manifestPromises = {}
const filesPromises = {}

jsonSocket.on('message', async (message) => {
switch (message.action) {
case 'requestPackage': {
await requestPackage(message.msgId, message.args[0], message.args[1])
return
}
case 'prune': {
await store.prune()
return
}
case 'updateConnections': {
await store.updateConnections(message.args[0], message.args[1])
return
const server = http.createServer(async (req: IncomingMessage, res: ServerResponse) => {
if (req.method !== 'POST') {
res.statusCode = 503
res.end(JSON.stringify(`Only POST is allowed, received ${req.method}`))
return
}

let body: any = '' // tslint:disable-line
req.on('data', (data) => {
body += data
})
req.on('end', async () => {
try {
if (body.length > 0) {
body = JSON.parse(body)
} else {
body = {}
}
case 'saveState': {
await store.saveState()
return

switch (req.url) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the switch statement should be probably executed early, because if an unknown route is requested, there's no need to read the data

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The switch cannot be executed early because we don't have the body until the request has been received, and I don't see any reason to add complexity to wait for the body since we always need the body or it is empty so it doesn't add overhead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the URL known from the very start? The URL can be validated before collecting the body

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and once we validate the URL? We've to wait for the body... so I am not really sure why we should validate the URL first

case '/requestPackage':
const pkgResponse = await store.requestPackage(body.wantedDependency, body.options)
if (!pkgResponse.isLocal) {
manifestPromises[body.msgId] = pkgResponse.fetchingManifest
filesPromises[body.msgId] = pkgResponse.fetchingFiles
}
res.end(JSON.stringify(pkgResponse))
break
case '/packageFilesResponse':
const filesResponse = await filesPromises[body.msgId]
delete filesPromises[body.msgId]
res.end(JSON.stringify(filesResponse))
break
case '/manifestResponse':
const manifestResponse = await manifestPromises[body.msgId]
delete manifestPromises[body.msgId]
res.end(JSON.stringify(manifestResponse))
break
case '/updateConnections':
await store.updateConnections(body.prefix, body.opts)
res.end(JSON.stringify('OK'))
break
case '/prune':
await store.prune()
res.end(JSON.stringify('OK'))
break
case '/saveState':
await store.saveState()
res.end(JSON.stringify('OK'))
break
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if no route was matched, I think a 404 response should be returned

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right 👍

} catch (e) {
res.statusCode = 503
res.end(JSON.stringify(e.message))
}
})
})

return {
close: () => server.close(),
let listener: Server;
if (opts.path) {
listener = server.listen(opts.path)
} else {
listener = server.listen(opts.port, opts.hostname)
}
}

async function requestPackageWithCtx (
ctx: {
jsonSocket: JsonSocket,
store: StoreController,
},
msgId: string,
wantedDependency: WantedDependency,
options: RequestPackageOptions,
) {
const packageResponse = await ctx.store.requestPackage(wantedDependency, options) // TODO: If this fails, also return the error
ctx.jsonSocket.sendMessage({
action: `packageResponse:${msgId}`,
body: packageResponse,
}, (err) => err && console.error(err))

if (!packageResponse.isLocal) {
packageResponse.fetchingFiles
.then((packageFilesResponse) => {
ctx.jsonSocket.sendMessage({
action: `packageFilesResponse:${msgId}`,
body: packageFilesResponse,
}, (err) => err && console.error(err))
})
.catch((err) => {
ctx.jsonSocket.sendMessage({
action: `packageFilesResponse:${msgId}`,
err,
}, (merr) => merr && console.error(merr))
})

packageResponse.fetchingManifest
.then((manifestResponse) => {
ctx.jsonSocket.sendMessage({
action: `manifestResponse:${msgId}`,
body: manifestResponse,
}, (err) => err && console.error(err))
})
.catch((err) => {
ctx.jsonSocket.sendMessage({
action: `manifestResponse:${msgId}`,
err,
}, (merr) => merr && console.error(merr))
})
return {
close: () => listener.close(() => { return }),
}
}
2 changes: 0 additions & 2 deletions test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import {
import createResolver from '@pnpm/npm-resolver'
import createFetcher from '@pnpm/tarball-fetcher'
import createStore from 'package-store'
import net = require('net')
import JsonSocket = require('json-socket')

test('server', async t => {
const registry = 'https://registry.npmjs.org/'
Expand Down