-
Notifications
You must be signed in to change notification settings - Fork 2
Use http server over local domain socket instead of json-socket #3
Changes from 6 commits
528b615
77cdb73
abaeb7a
9b51d54
b08f7a5
58c63bb
70ab908
4fb945c
9ccf8a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,126 +1,89 @@ | ||
import { | ||
PackageResponse, | ||
RequestPackageFunction, | ||
RequestPackageOptions, | ||
Resolution, | ||
WantedDependency, | ||
} from '@pnpm/package-requester' | ||
import JsonSocket = require('json-socket') | ||
import net = require('net') | ||
|
||
import got = require('got') | ||
import pLimit = require('p-limit') | ||
import {StoreController} from 'package-store' | ||
import uuid = require('uuid') | ||
|
||
export default function ( | ||
initOpts: object, | ||
initOpts: { | ||
path?: string, | ||
port?: number, | ||
hostname?: string, | ||
}, | ||
): Promise<StoreController> { | ||
const socket = new JsonSocket(new net.Socket()); | ||
socket.connect(initOpts as any) // tslint:disable-line | ||
let remotePrefix: string | ||
if (initOpts.path) { | ||
remotePrefix = `http://unix:${initOpts.path}:` | ||
} else { | ||
remotePrefix = `http://${initOpts.hostname}:${initOpts.port}` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be fine to pass in the formed path, these manipulations can be done by pnpm |
||
} | ||
const limitedRetryFetch = retryFetch.bind(null, pLimit(100)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd make the concurrency configurable |
||
|
||
return new Promise((resolve, reject) => { | ||
socket.on('connect', () => { | ||
const waiters = createWaiters() | ||
|
||
socket.on('message', (message) => { | ||
if (message.err) { | ||
waiters.reject(message.action, message.err) | ||
} else { | ||
waiters.resolve(message.action, message.body) | ||
} | ||
}) | ||
|
||
resolve({ | ||
close: async () => { | ||
socket.end() | ||
}, | ||
prune: async () => { | ||
socket.sendMessage({ | ||
action: 'prune', | ||
}, (err) => err && console.error(err)) | ||
}, | ||
requestPackage: requestPackage.bind(null, socket, waiters), | ||
saveState: async () => { | ||
socket.sendMessage({ | ||
action: 'saveState', | ||
}, (err) => err && console.error(err)) | ||
}, | ||
updateConnections: async (prefix: string, opts: {addDependencies: string[], removeDependencies: string[], prune: boolean}) => { | ||
socket.sendMessage({ | ||
action: 'updateConnections', | ||
args: [prefix, opts], | ||
}, (err) => err && console.error(err)) | ||
}, | ||
}) | ||
resolve({ | ||
close: async () => { return }, | ||
prune: async () => { | ||
await limitedRetryFetch(`${remotePrefix}/prune`, {}) | ||
}, | ||
requestPackage: requestPackage.bind(null, remotePrefix, limitedRetryFetch), | ||
saveState: async () => { | ||
await limitedRetryFetch(`${remotePrefix}/saveState`, {}) | ||
}, | ||
updateConnections: async (prefix: string, opts: {addDependencies: string[], removeDependencies: string[], prune: boolean}) => { | ||
await limitedRetryFetch(`${remotePrefix}/updateConnections`, { | ||
opts, | ||
prefix, | ||
}) | ||
}, | ||
}) | ||
}) | ||
} | ||
|
||
function createWaiters () { | ||
const waiters = {} | ||
return { | ||
add (id: string) { | ||
waiters[id] = deffered() | ||
return waiters[id].promise | ||
}, | ||
resolve (id: string, obj: object) { | ||
if (waiters[id]) { | ||
waiters[id].resolve(obj) | ||
delete waiters[id] | ||
} | ||
}, | ||
reject (id: string, err: object) { | ||
if (waiters[id]) { | ||
waiters[id].reject(err) | ||
delete waiters[id] | ||
} | ||
}, | ||
} | ||
} | ||
|
||
// tslint:disable-next-line | ||
function noop () {} | ||
|
||
function deffered<T> (): { | ||
promise: Promise<T>, | ||
resolve: (v: T) => void, | ||
reject: (err: Error) => void, | ||
} { | ||
let pResolve: (v: T) => void = noop | ||
let pReject: (err: Error) => void = noop | ||
const promise = new Promise<T>((resolve, reject) => { | ||
pResolve = resolve | ||
pReject = reject | ||
function retryFetch (limit: (fn: () => PromiseLike<object>) => Promise<object>, url: string, body: object): Promise<object> { // tslint:disable-line | ||
return limit(async () => { | ||
const response = await got(url, { | ||
body: JSON.stringify(body), | ||
headers: { 'Content-Type': 'application/json' }, | ||
method: 'POST', | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can use p-limit |
||
return JSON.parse(response.body) | ||
}).catch((e) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can use async/await syntax here with a regular try/catch also, I think the try/catch should be inside the function and cover just the |
||
if (!e.message.startsWith('Error: connect ECONNRESET') && !e.message.startsWith('Error: connect ECONNREFUSED')) { | ||
throw e | ||
} | ||
return retryFetch(limit, url, body) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and is there a way to tell got to retry as many times as it's needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, by default it reties 2 times |
||
}) | ||
return { | ||
promise, | ||
reject: pReject, | ||
resolve: pResolve, | ||
} | ||
} | ||
|
||
function requestPackage ( | ||
socket: JsonSocket, | ||
waiters: object, | ||
remotePrefix: string, | ||
limitedRetryFetch: (url: string, body: object) => any, // tslint:disable-line | ||
wantedDependency: WantedDependency, | ||
options: RequestPackageOptions, | ||
): Promise<PackageResponse> { | ||
const msgId = uuid.v4() | ||
|
||
const fetchingManifest = waiters['add'](`manifestResponse:${msgId}`) // tslint:disable-line | ||
const fetchingFiles = waiters['add'](`packageFilesResponse:${msgId}`) // tslint:disable-line | ||
const response = waiters['add'](`packageResponse:${msgId}`) // tslint:disable-line | ||
.then((packageResponse: object) => { | ||
return Object.assign(packageResponse, { | ||
fetchingFiles, | ||
fetchingManifest, | ||
finishing: Promise.all([fetchingManifest, fetchingFiles]).then(() => undefined), | ||
}) | ||
}) | ||
|
||
socket.sendMessage({ | ||
action: 'requestPackage', | ||
args: [wantedDependency, options], | ||
return limitedRetryFetch(`${remotePrefix}/requestPackage`, { | ||
msgId, | ||
}, (err) => err && console.error(err)) | ||
|
||
return response | ||
options, | ||
wantedDependency, | ||
}) | ||
.then((packageResponse: PackageResponse) => { | ||
const fetchingManifest = limitedRetryFetch(`${remotePrefix}/manifestResponse`, { | ||
msgId, | ||
}) | ||
const fetchingFiles = limitedRetryFetch(`${remotePrefix}/packageFilesResponse`, { | ||
msgId, | ||
}) | ||
return Object.assign(packageResponse, { | ||
fetchingFiles, | ||
fetchingManifest, | ||
finishing: Promise.all([fetchingManifest, fetchingFiles]).then(() => undefined), | ||
}) | ||
}) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,91 +1,84 @@ | ||
import { | ||
RequestPackageFunction, | ||
RequestPackageOptions, | ||
WantedDependency, | ||
} from '@pnpm/package-requester' | ||
import JsonSocket = require('json-socket') | ||
import net = require('net') | ||
import http = require('http') | ||
import {IncomingMessage, Server, ServerResponse} from 'http' | ||
import {StoreController} from 'package-store' | ||
|
||
export default function ( | ||
store: StoreController, | ||
opts: object, | ||
opts: { | ||
path?: string, | ||
port?: number, | ||
hostname?: string, | ||
}, | ||
) { | ||
const server = net.createServer() | ||
server.listen(opts) | ||
server.on('connection', (socket) => { | ||
const jsonSocket = new JsonSocket(socket) | ||
const requestPackage = requestPackageWithCtx.bind(null, {jsonSocket, store}) | ||
const manifestPromises = {} | ||
const filesPromises = {} | ||
|
||
jsonSocket.on('message', async (message) => { | ||
switch (message.action) { | ||
case 'requestPackage': { | ||
await requestPackage(message.msgId, message.args[0], message.args[1]) | ||
return | ||
} | ||
case 'prune': { | ||
await store.prune() | ||
return | ||
} | ||
case 'updateConnections': { | ||
await store.updateConnections(message.args[0], message.args[1]) | ||
return | ||
const server = http.createServer(async (req: IncomingMessage, res: ServerResponse) => { | ||
if (req.method !== 'POST') { | ||
res.statusCode = 503 | ||
res.end(JSON.stringify(`Only POST is allowed, received ${req.method}`)) | ||
return | ||
} | ||
|
||
let body: any = '' // tslint:disable-line | ||
req.on('data', (data) => { | ||
body += data | ||
}) | ||
req.on('end', async () => { | ||
try { | ||
if (body.length > 0) { | ||
body = JSON.parse(body) | ||
} else { | ||
body = {} | ||
} | ||
case 'saveState': { | ||
await store.saveState() | ||
return | ||
|
||
switch (req.url) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the switch statement should be probably executed early, because if an unknown route is requested, there's no need to read the data There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The switch cannot be executed early because we don't have the body until the request has been received, and I don't see any reason to add complexity to wait for the body since we always need the body or it is empty so it doesn't add overhead. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't the URL known from the very start? The URL can be validated before collecting the body There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and once we validate the URL? We've to wait for the body... so I am not really sure why we should validate the URL first |
||
case '/requestPackage': | ||
const pkgResponse = await store.requestPackage(body.wantedDependency, body.options) | ||
if (!pkgResponse.isLocal) { | ||
manifestPromises[body.msgId] = pkgResponse.fetchingManifest | ||
filesPromises[body.msgId] = pkgResponse.fetchingFiles | ||
} | ||
res.end(JSON.stringify(pkgResponse)) | ||
break | ||
case '/packageFilesResponse': | ||
const filesResponse = await filesPromises[body.msgId] | ||
delete filesPromises[body.msgId] | ||
res.end(JSON.stringify(filesResponse)) | ||
break | ||
case '/manifestResponse': | ||
const manifestResponse = await manifestPromises[body.msgId] | ||
delete manifestPromises[body.msgId] | ||
res.end(JSON.stringify(manifestResponse)) | ||
break | ||
case '/updateConnections': | ||
await store.updateConnections(body.prefix, body.opts) | ||
res.end(JSON.stringify('OK')) | ||
break | ||
case '/prune': | ||
await store.prune() | ||
res.end(JSON.stringify('OK')) | ||
break | ||
case '/saveState': | ||
await store.saveState() | ||
res.end(JSON.stringify('OK')) | ||
break | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if no route was matched, I think a 404 response should be returned There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right 👍 |
||
} catch (e) { | ||
res.statusCode = 503 | ||
res.end(JSON.stringify(e.message)) | ||
} | ||
}) | ||
}) | ||
|
||
return { | ||
close: () => server.close(), | ||
let listener: Server; | ||
if (opts.path) { | ||
listener = server.listen(opts.path) | ||
} else { | ||
listener = server.listen(opts.port, opts.hostname) | ||
} | ||
} | ||
|
||
async function requestPackageWithCtx ( | ||
ctx: { | ||
jsonSocket: JsonSocket, | ||
store: StoreController, | ||
}, | ||
msgId: string, | ||
wantedDependency: WantedDependency, | ||
options: RequestPackageOptions, | ||
) { | ||
const packageResponse = await ctx.store.requestPackage(wantedDependency, options) // TODO: If this fails, also return the error | ||
ctx.jsonSocket.sendMessage({ | ||
action: `packageResponse:${msgId}`, | ||
body: packageResponse, | ||
}, (err) => err && console.error(err)) | ||
|
||
if (!packageResponse.isLocal) { | ||
packageResponse.fetchingFiles | ||
.then((packageFilesResponse) => { | ||
ctx.jsonSocket.sendMessage({ | ||
action: `packageFilesResponse:${msgId}`, | ||
body: packageFilesResponse, | ||
}, (err) => err && console.error(err)) | ||
}) | ||
.catch((err) => { | ||
ctx.jsonSocket.sendMessage({ | ||
action: `packageFilesResponse:${msgId}`, | ||
err, | ||
}, (merr) => merr && console.error(merr)) | ||
}) | ||
|
||
packageResponse.fetchingManifest | ||
.then((manifestResponse) => { | ||
ctx.jsonSocket.sendMessage({ | ||
action: `manifestResponse:${msgId}`, | ||
body: manifestResponse, | ||
}, (err) => err && console.error(err)) | ||
}) | ||
.catch((err) => { | ||
ctx.jsonSocket.sendMessage({ | ||
action: `manifestResponse:${msgId}`, | ||
err, | ||
}, (merr) => merr && console.error(merr)) | ||
}) | ||
return { | ||
close: () => listener.close(() => { return }), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type dependencies of prod dependencies should be also prod dependencies
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok