diff --git a/.gitignore b/.gitignore index 0d475b2..54e948e 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ build coverage .nyc_output data/member-a/blobs -data/member-b/blobs \ No newline at end of file +data/member-a/destinations +data/member-b/blobs +data/member-b/destinations \ No newline at end of file diff --git a/src/handlers/blobs.ts b/src/handlers/blobs.ts index 0b53815..8b4addd 100644 --- a/src/handlers/blobs.ts +++ b/src/handlers/blobs.ts @@ -91,12 +91,12 @@ export const deliverBlob = async ({ blobPath, recipientID, recipientURL, request let sender = peerID; if(senderDestination !== undefined) { formData.append('senderDestination', senderDestination); - sender += utils.constants.ID_SEGMENT_SEPARATOR + senderDestination + sender += '/' + senderDestination } let recipient = recipientID; if(recipientDestination !== undefined) { formData.append('recipientDestination', recipientDestination); - recipient += utils.constants.ID_SEGMENT_SEPARATOR + recipientDestination; + recipient += '/' + recipientDestination; } formData.append('blob', stream); const httpsAgent = new https.Agent({ cert, key, ca }); diff --git a/src/handlers/messages.ts b/src/handlers/messages.ts index 878d8bb..8c6a487 100644 --- a/src/handlers/messages.ts +++ b/src/handlers/messages.ts @@ -48,12 +48,12 @@ export const deliverMessage = async ({ message, recipientID, recipientURL, reque let sender = peerID; if(senderDestination !== undefined) { formData.append('senderDestination', senderDestination); - sender += utils.constants.ID_SEGMENT_SEPARATOR + senderDestination + sender += '/' + senderDestination } let recipient = recipientID; if(recipientDestination !== undefined) { formData.append('recipientDestination', recipientDestination); - recipient += utils.constants.ID_SEGMENT_SEPARATOR + recipientDestination; + recipient += '/' + recipientDestination; } formData.append('message', message); log.trace(`Delivering message to ${recipient} at ${recipientURL}`); diff --git a/src/lib/config.ts b/src/lib/config.ts index b02a328..fd71dcd 100644 --- a/src/lib/config.ts +++ b/src/lib/config.ts @@ -28,6 +28,7 @@ const ajv = new Ajv(); const validateConfig = ajv.compile(configSchema); const configFilePath = path.join(utils.constants.DATA_DIRECTORY, utils.constants.CONFIG_FILE_NAME); const peersFilePath = path.join(utils.constants.DATA_DIRECTORY, utils.constants.PEERS_FILE_NAME); +const destinationsFilePath = path.join(utils.constants.DATA_DIRECTORY, utils.constants.DESTINATIONS_FILE_NAME); export let config: IConfig; @@ -39,6 +40,18 @@ const loadConfig = async () => { try { log.debug(`Reading config file ${configFilePath}`); const data = JSON.parse(await fs.readFile(configFilePath, 'utf8')); + try { + log.debug(`Reading destinations file ${destinationsFilePath}`); + data.destinations = JSON.parse(await fs.readFile(destinationsFilePath, 'utf8')); + } catch (err: any) { + // if file does not exist, just set destinations to an empty array + log.debug(`Error code when reading destinations file ${err.code}`); + if (err.code === 'ENOENT') { + data.destinations = data.destinations; + } else { + throw err; + } + } try { log.debug(`Reading peers file ${peersFilePath}`); data.peers = JSON.parse(await fs.readFile(peersFilePath, 'utf8')); @@ -68,28 +81,32 @@ const loadConfig = async () => { }; export const persistPeers = async () => { - await ensurePeersDirectoryExists(); + await ensureDirectoryExists(peersFilePath); await fs.writeFile(peersFilePath, JSON.stringify(config.peers, null, 2)); }; +export const persistDestinations = async () => { + await ensureDirectoryExists(destinationsFilePath); + await fs.writeFile(destinationsFilePath, JSON.stringify(config.destinations, null, 2)); +}; -const ensurePeersDirectoryExists = async () => { +const ensureDirectoryExists = async (directory: string) => { try { - await fs.access(peersFilePath); + await fs.access(directory); } catch(err: any) { if(err.code === 'ENOENT') { - await createPeersDirectory(); + await createDirectory(directory); } else { - log.warn(`Could not check for existence of peers subdirectory ${err.code}`); + log.warn(`Could not check for existence of ${err.code}`); } } }; -const createPeersDirectory = async () => { +const createDirectory = async (directory: string) => { try { - await fs.mkdir(path.parse(peersFilePath).dir, { recursive: true }); - log.info('Peers subdirectory created'); + await fs.mkdir(path.parse(directory).dir, { recursive: true }); + log.info(`Directory ${directory} created`); } catch(err: any) { - log.error(`Failed to create peers subdirectory ${err.code}`); + log.error(`Failed to create directory ${directory} ${err.code}`); } }; \ No newline at end of file diff --git a/src/lib/interfaces.ts b/src/lib/interfaces.ts index a16b8b7..a04b481 100644 --- a/src/lib/interfaces.ts +++ b/src/lib/interfaces.ts @@ -29,9 +29,11 @@ export interface IConfig { endpoint?: string } apiKey?: string + destinations?: string[] peers: { id: string endpoint: string + destinations?: string[] }[] jsonParserLimit?: string } diff --git a/src/lib/utils.ts b/src/lib/utils.ts index 1b607b7..719666e 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -36,6 +36,7 @@ export const constants = { CERT_FILE: 'cert.pem', KEY_FILE: 'key.pem', CA_FILE: 'ca.pem', + DESTINATIONS_FILE_NAME: 'destinations/data.json', TRANSFER_HASH_ALGORITHM: 'sha256', REST_API_CALL_MAX_ATTEMPTS: 5, REST_API_CALL_RETRY_DELAY_MS: 500, @@ -46,8 +47,7 @@ export const constants = { SIZE_HEADER_NAME: 'dx-size', LAST_UPDATE_HEADER_NAME: 'dx-last-update', DEFAULT_JSON_PARSER_LIMIT: '1mb', - DEFAULT_MAX_INFLIGHT: 100, - ID_SEGMENT_SEPARATOR: '/' + DEFAULT_MAX_INFLIGHT: 100 }; const log = new Logger('utils.ts'); axios.defaults.timeout = constants.REST_API_CALL_REQUEST_TIMEOUT; @@ -140,7 +140,7 @@ export const axiosWithRetry = async (config: AxiosRequestConfig) => { const data = err.response?.data; log.error(`${config.method} ${config.url} attempt ${attempts} [${err.response?.status}]`, (data && !data.on) ? data : err.stack); if (err.response?.status === 404) { - throw err; + throw data.error ? new Error(data.error) : err; } else { currentError = err; attempts++; diff --git a/src/routers/api.ts b/src/routers/api.ts index bf1c3b5..4ad33dd 100644 --- a/src/routers/api.ts +++ b/src/routers/api.ts @@ -23,7 +23,7 @@ import * as blobsHandler from '../handlers/blobs'; import * as eventsHandler from '../handlers/events'; import * as messagesHandler from '../handlers/messages'; import { ca, cert, certBundle, key, peerID } from '../lib/cert'; -import { config, persistPeers } from '../lib/config'; +import { config, persistDestinations, persistPeers } from '../lib/config'; import { IStatus } from '../lib/interfaces'; import RequestError from '../lib/request-error'; import * as utils from '../lib/utils'; @@ -41,7 +41,8 @@ router.get('/id', async (_req, res, next) => { res.send({ id: peerID, endpoint: config.p2p.endpoint ?? `https://${config.p2p.hostname}:${config.p2p.port}`, - cert: certBundle + cert: certBundle, + destinations: config.destinations }); } catch (err) { next(err); @@ -82,24 +83,42 @@ router.get('/peers', (_req, res) => { res.send(config.peers); }); -router.put('/peers/:id', async (req, res, next) => { +router.put('/peers/:id/:destination?', async (req, res, next) => { try { - if (req.body.endpoint === undefined) { - throw new RequestError('Missing endpoint', 400); - } - if (req.body.cert !== undefined) { - await fs.writeFile(path.join(utils.constants.DATA_DIRECTORY, utils.constants.PEER_CERTS_SUBDIRECTORY, `${req.params.id}.pem`), req.body.cert); - } - let peer = config.peers.find(peer => peer.id === req.params.id); - if (peer === undefined) { - peer = { - id: req.params.id, - endpoint: req.body.endpoint - }; - config.peers.push(peer); + if (req.params.id === peerID) { + if (req.params.destination !== undefined) { + if (config.destinations === undefined) { + config.destinations = [req.params.destination] + } else if (!config.destinations.includes(req.params.destination)) { + config.destinations.push(req.params.destination); + } + await persistDestinations(); + } + } else { + let peer = config.peers.find(peer => peer.id === req.params.id); + if (peer === undefined) { + if (req.body.endpoint === undefined) { + throw new RequestError('Missing endpoint', 400); + } + peer = { + id: req.params.id, + endpoint: req.body.endpoint + }; + config.peers.push(peer); + } + if (req.params.destination !== undefined) { + if (peer.destinations === undefined) { + peer.destinations = [req.params.destination]; + } else if (!peer.destinations.includes(req.params.destination)) { + peer.destinations.push(req.params.destination); + } + } + await persistPeers(); + if (req.body.cert !== undefined) { + await fs.writeFile(path.join(utils.constants.DATA_DIRECTORY, utils.constants.PEER_CERTS_SUBDIRECTORY, `${req.params.id}.pem`), req.body.cert); + await refreshCACerts(); + } } - await persistPeers(); - await refreshCACerts(); res.send({ status: 'added' }); } catch (err) { next(err); @@ -133,40 +152,46 @@ router.post('/messages', async (req, res, next) => { } let senderDestination: string | undefined = undefined; if (typeof req.body.sender === 'string') { - if (!req.body.sender.startsWith(peerID)) { - throw new RequestError('Invalid sender'); - } else { - const destination = req.body.sender.substring(peerID.length + 1); - if(destination.length > 0) { - senderDestination = destination; + const segments = req.body.sender.split('/'); + if (segments[0] !== peerID) { + throw new RequestError(`Sender ID mismatch expected=${peerID} recieved=${segments[0]}`, 400); + } + if (segments.length > 1) { + if (!config.destinations?.includes(segments[1])) { + throw new RequestError(`Unknown sender destination expected=${config.destinations?.join('|') ?? 'none'} recieved=${segments[1]}`, 400); } + senderDestination = segments[1]; } } let recipientID: string; let recipientDestination: string | undefined = undefined; if (typeof req.body.recipient === 'string') { - const index = req.body.recipient.indexOf(utils.constants.ID_SEGMENT_SEPARATOR); - if (index !== -1) { - recipientID = req.body.recipient.substring(0, index); - const destination = req.body.recipient.substring(index + 1); - if(destination.length > 0) { - recipientDestination = destination; - } - } else { - recipientID = req.body.recipient; + const segments = req.body.recipient.split('/'); + recipientID = segments[0]; + if (segments.length > 1) { + recipientDestination = segments[1]; } } else { throw new RequestError('Missing recipient', 400); } - let recipientURL = config.peers.find(peer => peer.id === recipientID)?.endpoint; - if (recipientURL === undefined) { - throw new RequestError(`Unknown recipient`, 400); + let recipientEndpoint: string; + if (recipientID === peerID) { + recipientEndpoint = config.p2p.endpoint ?? `https://${config.p2p.hostname}:${config.p2p.port}`; + } else { + let recipientPeer = config.peers.find(peer => peer.id === recipientID); + if (recipientPeer === undefined) { + throw new RequestError(`Unknown recipient ${recipientID}`, 400); + } + recipientEndpoint = recipientPeer.endpoint; + if (recipientDestination !== undefined && !recipientPeer.destinations?.includes(recipientDestination)) { + throw new RequestError(`Unknown recipient destination expected=${recipientPeer.destinations?.join('|') ?? 'none'} recieved=${recipientDestination}`, 400); + } } let requestId = uuidV4(); if (typeof req.body.requestId === 'string') { requestId = req.body.requestId; } - messagesHandler.sendMessage(req.body.message, recipientID, recipientURL, requestId, senderDestination, recipientDestination); + messagesHandler.sendMessage(req.body.message, recipientID, recipientEndpoint, requestId, senderDestination, recipientDestination); res.send({ requestId }); } catch (err) { next(err); @@ -231,40 +256,45 @@ router.post('/transfers', async (req, res, next) => { await blobsHandler.retrieveMetadata(req.body.path); let senderDestination: string | undefined = undefined; if (typeof req.body.sender === 'string') { - if (!req.body.sender.startsWith(peerID)) { - throw new RequestError('Invalid sender'); - } else { - const destination = req.body.sender.substring(peerID.length + 1); - if(destination.length > 0) { - senderDestination = destination; + const segments = req.body.sender.split('/'); + if (segments[0] !== peerID) { + throw new RequestError(`Sender ID mismatch expected=${peerID} recieved=${segments[0]}`, 400); + } + if (segments.length > 1) { + if (!config.destinations?.includes(segments[1])) { + throw new RequestError(`Unknown sender destination expected=${config.destinations?.join('|')} recieved=${segments[1]}`, 400); } + senderDestination = segments[1]; } } let recipientID: string; let recipientDestination: string | undefined = undefined; if (typeof req.body.recipient === 'string') { - const index = req.body.recipient.indexOf(utils.constants.ID_SEGMENT_SEPARATOR); - if (index !== -1) { - recipientID = req.body.recipient.substring(0, index); - const destination = req.body.recipient.substring(index + 1); - if(destination.length > 0) { - recipientDestination = destination; - } - } else { - recipientID = req.body.recipient; + const segments = req.body.recipient.split('/'); + recipientID = segments[0]; + if (segments.length > 1) { + recipientDestination = segments[1]; } } else { throw new RequestError('Missing recipient', 400); } - let recipientURL = config.peers.find(peer => peer.id === recipientID)?.endpoint; - if (recipientURL === undefined) { - throw new RequestError(`Unknown recipient`, 400); + let recipientEndpoint: string; + recipientEndpoint = config.p2p.endpoint ?? `https://${config.p2p.hostname}:${config.p2p.port}`; + if (recipientID === peerID) { + } else { + let recipientPeer = config.peers.find(peer => peer.id === recipientID); + if (recipientPeer === undefined) { + throw new RequestError(`Unknown recipient`, 400); + } + if (recipientDestination !== undefined && !recipientPeer.destinations?.includes(recipientDestination)) { + throw new RequestError(`Unknown recipient destination expected=${recipientPeer.destinations?.join('|')} recieved=${recipientDestination}`, 400); + } } let requestId = uuidV4(); if (typeof req.body.requestId === 'string') { requestId = req.body.requestId; } - blobsHandler.sendBlob(req.body.path, recipientID, recipientURL, requestId, senderDestination, recipientDestination); + blobsHandler.sendBlob(req.body.path, recipientID, recipientEndpoint, requestId, senderDestination, recipientDestination); res.send({ requestId }); } catch (err) { next(err); diff --git a/src/routers/p2p.ts b/src/routers/p2p.ts index c8bfe70..9b97c50 100644 --- a/src/routers/p2p.ts +++ b/src/routers/p2p.ts @@ -22,6 +22,8 @@ import { IBlobReceivedEvent, IMessageReceivedEvent } from '../lib/interfaces'; import { v4 as uuidV4 } from 'uuid'; import { queueEvent } from '../handlers/events'; import { peerID } from '../lib/cert'; +import { config } from '../lib/config'; +import RequestError from '../lib/request-error'; export const router = Router(); @@ -34,11 +36,15 @@ router.post('/messages', async (req: Request, res, next) => { let sender = utils.extractPeerSenderFromRequest(req); const { senderDestination, recipientDestination, message } = await utils.extractMessageFromMultipartForm(req); if (senderDestination !== undefined) { - sender += utils.constants.ID_SEGMENT_SEPARATOR + senderDestination; + validateSenderDestination(sender, senderDestination); + sender += '/' + senderDestination; } let recipient = peerID; if (recipientDestination !== undefined) { - recipient += utils.constants.ID_SEGMENT_SEPARATOR + recipientDestination; + if (!config.destinations?.includes(recipientDestination)) { + throw new RequestError(`Unknown recipient destination expected=${config.destinations?.join('|') ?? 'none'} recieved=${recipientDestination}`, 404); + } + recipient += '/' + recipientDestination; } await queueEvent({ id: uuidV4(), @@ -58,11 +64,15 @@ router.put('/blobs/*', async (req: Request, res, next) => { let sender = utils.extractPeerSenderFromRequest(req); const { file, senderDestination, recipientDestination } = await utils.extractFileFromMultipartForm(req); if (senderDestination !== undefined) { - sender += utils.constants.ID_SEGMENT_SEPARATOR + senderDestination; + validateSenderDestination(sender, senderDestination); + sender += '/' + senderDestination; } let recipient = peerID; if (recipientDestination !== undefined) { - recipient += utils.constants.ID_SEGMENT_SEPARATOR + recipientDestination; + if (!config.destinations?.includes(recipientDestination)) { + throw new RequestError(`Unknown recipient destination expected=${config.destinations?.join('|') ?? 'none'} recieved=${recipientDestination}`, 404); + } + recipient += '/' + recipientDestination; } const blobPath = path.join(utils.constants.RECEIVED_BLOBS_SUBDIRECTORY, sender, req.params[0]); const metadata = await blobsHandler.storeBlob(file, blobPath); @@ -81,3 +91,19 @@ router.put('/blobs/*', async (req: Request, res, next) => { next(err); } }); + +const validateSenderDestination = (sender: string, senderDestination: string) => { + if (sender === peerID) { + if (!config.destinations?.includes(senderDestination)) { + throw new RequestError(`Unknown sender destination expected=${config.destinations?.join('|') ?? 'none'} recieved=${senderDestination}`, 404); + } + } else { + const peer = config.peers.find(peer => peer.id === sender); + if (peer === undefined) { + throw new RequestError(`Unknown sender ${sender}`, 404); + } + if (!peer.destinations?.includes(senderDestination)) { + throw new RequestError(`Unknown sender destination expected=${peer.destinations?.join('|') ?? 'none'} recieved=${senderDestination}`, 404); + } + } +}; \ No newline at end of file diff --git a/src/schemas/config.json b/src/schemas/config.json index ec67ebf..5bc4bf0 100644 --- a/src/schemas/config.json +++ b/src/schemas/config.json @@ -43,6 +43,12 @@ "apiKey": { "type": "string" }, + "destinations": { + "type": "array", + "items": { + "type": "string" + } + }, "peers": { "type": "array", "items": { @@ -57,6 +63,12 @@ }, "endpoint": { "type": "string" + }, + "destinations": { + "type": "array", + "items": { + "type": "string" + } } } }