7 Apr 2022

Direct-to-S3 Node.js samples

nodejs oss samples

With the announcement of OSS migrating to Direct-to-S3 approach we want help you make this transition smoother. This time we will start with the Node.js utility for the new binary transfer in Autodesk Forge services. These samples are built using LTS version of Node.js

The team is also working on the development of a new SDK that will use the direct to S3 approach. 

Our teammate Petr Broz worked on a curated utility file that includes all the newly released endpoints for the OSS Direct to S3 approach. 

The Github repositories can be found here, and in that repo we can find the Node.js branch available here

Index.js (Utility File)

const axios = require('axios');
const rax = require('retry-axios');

class BinaryTransferClient {
    /**
     * Creates a new instance of the binary transfer helper client.
     *
     * Note that the provided access token will be used for all requests initiated
     * by this client. For long-running operations the token could potentially expire,
     * so consider modifying this class to refresh the token whenever needed.
     *
     * @param {string} token Access token to use when communicating with Autodesk Forge services.
     * @param {string} [host="https://developer.api.autodesk.com"] Optional Autodesk Forge host).
     */
    constructor(token, host) {
        this.token = token;
        this.axios = axios.create({
            baseURL: (host || 'https://developer.api.autodesk.com') + '/oss/v2/'
        });
        // Attach an interceptor to the axios instance that will retry response codes 100-199, 429, and 500-599.
        // For default settings, see https://github.com/JustinBeckwith/retry-axios#usage.
        this.axios.defaults.raxConfig = {
            instance: this.axios
        };
        rax.attach(this.axios);
    }

    /**
     * Generates one or more signed URLs that can be used to upload a file (or its parts) to OSS,
     * and an upload key that is used to generate additional URLs or in {@see _completeUpload}
     * after all the parts have been uploaded successfully.
     *
     * Note that if you are uploading in multiple parts, each part except for the final one
     * must be of size at least 5MB, otherwise the call to {@see _completeUpload} will fail.
     *
     * @async
     * @param {string} bucketKey Bucket key.
     * @param {string} objectKey Object key.
     * @param {number} [parts=1] How many URLs to generate in case of multi-part upload.
     * @param {number} [firstPart=1] Index of the part the first returned URL should point to.
     * For example, to upload parts 10 through 15 of a file, use `firstPart` = 10 and `parts` = 6.
     * @param {string} [uploadKey] Optional upload key if this is a continuation of a previously
     * initiated upload.
     * @param {number} [minutesExpiration] Custom expiration for the upload URLs
     * (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
     * @returns {Promise<object>} Signed URLs for uploading chunks of the file to AWS S3,
     * and a unique upload key used to generate additional URLs or to complete the upload.
     */
    async _getUploadUrls(bucketKey, objectKey, parts = 1, firstPart = 1, uploadKey, minutesExpiration) {
        let endpoint = `buckets/${bucketKey}/objects/${encodeURIComponent(objectKey)}/signeds3upload?parts=${parts}&firstPart=${firstPart}`;
        if (uploadKey) {
            endpoint += `&uploadKey=${uploadKey}`;
        }
        if (minutesExpiration) {
            endpoint += `&minutesExpiration=${minutesExpiration}`;
        }
        const headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer ' + this.token
        };
        const resp = await this.axios.get(endpoint, { headers });
        return resp.data;
    }

    /**
     * Finalizes the upload of a file to OSS.
     *
     * @async
     * @param {string} bucketKey Bucket key.
     * @param {string} objectKey Object key.
     * @param {string} uploadKey Upload key returned by {@see _getUploadUrls}.
     * @param {string} [contentType] Optinal content type that should be recorded for the uploaded file.
     * @returns {Promise<object>} Details of the created object in OSS.
     */
    async _completeUpload(bucketKey, objectKey, uploadKey, contentType) {
        const endpoint = `buckets/${bucketKey}/objects/${encodeURIComponent(objectKey)}/signeds3upload`;
        const payload = { uploadKey };
        const headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer ' + this.token
        };
        if (contentType) {
            headers['x-ads-meta-Content-Type'] = contentType;
        }
        const resp = await this.axios.post(endpoint, payload, { headers });
        return resp.data;
    }

    /**
     * Uploads content to a specific bucket object.
     *
     * @async
     * @param {string} bucketKey Bucket key.
     * @param {string} objectKey Name of uploaded object.
     * @param {Buffer} data Object content.
     * @param {object} [options] Additional upload options.
     * @param {string} [options.contentType] Optional content type of the uploaded file.
     * @param {number} [options.minutesExpiration] Custom expiration for the upload URLs
     * (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
     * @returns {Promise<object>} Object description containing 'bucketKey', 'objectKey', 'objectId',
     * 'sha1', 'size', 'location', and 'contentType'.
     * @throws Error when the request fails, for example, due to insufficient rights, or incorrect scopes.
     */
    async uploadObject(bucketKey, objectKey, data, options) {
        console.assert(data.byteLength > 0);
        const ChunkSize = 5 << 20;
        const MaxBatches = 25;
        const totalParts = Math.ceil(data.byteLength / ChunkSize);
        let partsUploaded = 0;
        let uploadUrls = [];
        let uploadKey;
        while (partsUploaded < totalParts) {
            const chunk = data.slice(partsUploaded * ChunkSize, Math.min((partsUploaded + 1) * ChunkSize, data.byteLength));
            while (true) {
                console.debug('Uploading part', partsUploaded + 1);
                if (uploadUrls.length === 0) {
                    // Automatically retries 429 and 500-599 responses
                    const uploadParams = await this._getUploadUrls(bucketKey, objectKey, Math.min(totalParts - partsUploaded, MaxBatches), partsUploaded + 1, uploadKey, options?.minutesExpiration);
                    uploadUrls = uploadParams.urls.slice();
                    uploadKey = uploadParams.uploadKey;
                }
                const url = uploadUrls.shift();
                try {
                    await this.axios.put(url, chunk);
                    break;
                } catch (err) {
                    const status = err.response?.status;
                    if (status === 403) {
                        console.debug('Got 403, refreshing upload URLs');
                        uploadUrls = []; // Couldn't this cause an infinite loop? (i.e., could the server keep responding with 403 indefinitely?)
                    } else {
                        throw err;
                    }
                }
            }
            console.debug('Part successfully uploaded', partsUploaded + 1);
            partsUploaded++;
        }
        console.debug('Completing part upload');
        return this._completeUpload(bucketKey, objectKey, uploadKey, options?.contentType);
    }

    /**
     * Uploads content stream to a specific bucket object.
     *
     * @async
     * @param {string} bucketKey Bucket key.
     * @param {string} objectKey Name of uploaded object.
     * @param {AsyncIterable<Buffer>} stream Input stream.
     * @param {object} [options] Additional upload options.
     * @param {string} [options.contentType] Optional content type of the uploaded file.
     * @param {number} [options.minutesExpiration] Custom expiration for the upload URLs
     * (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
     * @returns {Promise<object>} Object description containing 'bucketKey', 'objectKey', 'objectId',
     * 'sha1', 'size', 'location', and 'contentType'.
     * @throws Error when the request fails, for example, due to insufficient rights, or incorrect scopes.
     */
    async uploadObjectStream(bucketKey, objectKey, input, options) {
        // Helper async generator making sure that each chunk has at least certain number of bytes
        async function* bufferChunks(input, minChunkSize) {
            let buffer = Buffer.alloc(2 * minChunkSize);
            let bytesRead = 0;
            for await (const chunk of input) {
                chunk.copy(buffer, bytesRead);
                bytesRead += chunk.byteLength;
                if (bytesRead >= minChunkSize) {
                    yield buffer.slice(0, bytesRead);
                    bytesRead = 0;
                }
            }
            if (bytesRead > 0) {
                yield buffer.slice(0, bytesRead);
            }
        }

        const MaxBatches = 25;
        const ChunkSize = 5 << 20;
        let partsUploaded = 0;
        let uploadUrls = [];
        let uploadKey;
        for await (const chunk of bufferChunks(input, ChunkSize)) {
            while (true) {
                console.debug('Uploading part', partsUploaded + 1);
                if (uploadUrls.length === 0) {
                    const uploadParams = await this._getUploadUrls(bucketKey, objectKey, MaxBatches, partsUploaded + 1, uploadKey, options?.minutesExpiration);
                    uploadUrls = uploadParams.urls.slice();
                    uploadKey = uploadParams.uploadKey;
                }
                const url = uploadUrls.shift();
                try {
                    await this.axios.put(url, chunk);
                    break;
                } catch (err) {
                    const status = err.response?.status;
                    if (status === 403) {
                        console.debug('Got 403, refreshing upload URLs');
                        uploadUrls = []; // Couldn't this cause an infinite loop? (i.e., could the server keep responding with 403 indefinitely?
                    } else {
                        throw err;
                    }
                }
            }
            console.debug('Part successfully uploaded', partsUploaded + 1);
            partsUploaded++;
        }
        console.debug('Completing part upload');
        return this._completeUpload(bucketKey, objectKey, uploadKey, options?.contentType);
    }

    /**
     * Generates a signed URL that can be used to download a file from OSS.
     *
     * @async
     * @param {string} bucketKey Bucket key.
     * @param {string} objectKey Object key.
     * @param {number} [minutesExpiration] Custom expiration for the download URLs
     * (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
     * @returns {Promise<object>} Download URLs and potentially other helpful information.
     */
    async _getDownloadUrl(bucketKey, objectKey, minutesExpiration) {
        let endpoint = `buckets/${bucketKey}/objects/${encodeURIComponent(objectKey)}/signeds3download`;
        if (minutesExpiration) {
            endpoint += `?minutesExpiration=${minutesExpiration}`;
        }
        const headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer ' + this.token
        };
        const resp = await this.axios.get(endpoint, { headers });
        return resp.data;
    }

    /**
     * Downloads a specific OSS object.
     *
     * @async
     * @param {string} bucketKey Bucket key.
     * @param {string} objectKey Object key.
     * @param {object} [options] Additional download options.
     * @param {number} [options.minutesExpiration] Custom expiration for the download URLs
     * (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
     * @returns {Promise<ArrayBuffer>} Object content.
     * @throws Error when the request fails, for example, due to insufficient rights, or incorrect scopes.
     */
    async downloadObject(bucketKey, objectKey, options) {
        console.debug('Retrieving download URL');
        const downloadParams = await this._getDownloadUrl(bucketKey, objectKey, options?.minutesExpiration);
        if (downloadParams.status !== 'complete') {
            throw new Error('File not available for download yet.');
        }
        const resp = await this.axios.get(downloadParams.url, {
            responseType: 'arraybuffer',
            onDownloadProgress: progressEvent => {
                const downloadedBytes = progressEvent.currentTarget.response.length;
                const totalBytes = parseInt(progressEvent.currentTarget.responseHeaders['Content-Length']);
                console.debug('Downloaded', downloadedBytes, 'bytes of', totalBytes);
            }
        });
        return resp.data;
    }

    /**
     * Downloads content stream of a specific bucket object.
     *
     * @async
     * @param {string} bucketKey Bucket key.
     * @param {string} objectKey Object name.
     * @param {object} [options] Additional download options.
     * @param {number} [options.minutesExpiration] Custom expiration for the download URLs
     * (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
     * @returns {Promise<ReadableStream>} Object content stream.
     * @throws Error when the request fails, for example, due to insufficient rights, or incorrect scopes.
     */
    async downloadObjectStream(bucketKey, objectKey, options) {
        console.debug('Retrieving download URL');
        const downloadParams = await this._getDownloadUrl(bucketKey, objectKey, options?.minutesExpiration);
        if (downloadParams.status !== 'complete') {
            throw new Error('File not available for download yet.');
        }
        const resp = await this.axios.get(downloadParams.url, {
            responseType: 'stream',
            onDownloadProgress: progressEvent => {
                const downloadedBytes = progressEvent.currentTarget.response.length;
                const totalBytes = parseInt(progressEvent.currentTarget.responseHeaders['Content-Length']);
                console.debug('Downloaded', downloadedBytes, 'bytes of', totalBytes);
            }
        });
        return resp.data;
    }
}

module.exports = {
    BinaryTransferClient
};

Default expiration time of the pre-signed urls is 2 minutes as default (longer expiration times can be set using the minutesExpiration param up to 60 minutes).. 

Download

Let's start with the Download process. This will need a set of 2 steps in order to directly download your files from AWS S3 using pre-signed urls. Here is the pseudo code explaining how it works. 

  1. Generate a download URL using the GET buckets/:bucketKey/objects/:objectName/signeds3download endpoint
  2. Use the new URL to download the OSS object directly from AWS S3
    • Consider retrying (for example, with an exponential backoff) the download when the response code is 100-199, 429, or 500-599

Here is how the code looks for a download of a stream.

const fs = require('fs');
const { BinaryTransferClient } = require('..');

async function downloadStream(filePath, bucketKey, objectKey, accessToken) {
    const client = new BinaryTransferClient(accessToken);
    const stream = await client.downloadObjectStream(bucketKey, objectKey);
    stream.pipe(fs.createWriteStream(filePath));
}

if (process.argv.length < 6) {
    console.log('Usage:');
    console.log('node ' + __filename + ' <path to local file> <bucket key> <object key> <access token>');
    process.exit(0);
}

downloadStream(process.argv[2], process.argv[3], process.argv[4], process.argv[5]);

And in case you would like to download object to local file (receiving the entire file into memory first) here is the code for it.

const fs = require('fs');
const { BinaryTransferClient } = require('..');

async function downloadBuffer(filePath, bucketKey, objectKey, accessToken) {
    const client = new BinaryTransferClient(accessToken);
    const buffer = await client.downloadObject(bucketKey, objectKey);
    fs.writeFileSync(filePath, buffer);
}

if (process.argv.length < 6) {
    console.log('Usage:');
    console.log('node ' + __filename + ' <path to local file> <bucket key> <object key> <access token>');
    process.exit(0);
}

downloadBuffer(process.argv[2], process.argv[3], process.argv[4], process.argv[5])
    .then(_ => 'Done!')
    .catch(err => console.error(err));

Upload

Let's now look into the Upload process. This will need a set of 3 steps in order to directly upload your files from AWS S3 using pre-signed urls. Here is the pseudo code explaining how it works. 

  1. Calculate the number of parts of the file to upload
    • Note: each uploaded part except for the last one must be at least 5MB
  2. Generate up to 25 URLs for uploading specific parts of the file using the GET buckets/:bucketKey/objects/:objectKey/signeds3upload?firstPart=<index of first part>&parts=<number of parts> endpoint
    • The part numbers start with 1
    • For example, to generate upload URLs for parts 10 through 15, set firstPart to 10 and parts to 6
    • This endpoint also returns an uploadKey that is used later to request additional URLs or to finalize the upload
  3. Upload remaining parts of the file to their corresponding upload URLs
    • Consider retrying (for example, with an exponential backoff) individual uploads when the response code is 100-199, 429, or 500-599
    • If the response code is 403, the upload URLs have expired; go back to step #2
    • If you've used up all the upload URLs and there are still parts that must be uploaded, go back to step #2
  4. Finalize the upload using the POST buckets/:bucketKey/objects/:objectKey/signeds3upload endpoint, using the uploadKey value from step #2

Here is how the code looks when uploading a local file to OSS bucket (as a stream).

const fs = require('fs');
const { BinaryTransferClient } = require('..');

async function uploadStream(filePath, bucketKey, objectKey, accessToken) {
    const client = new BinaryTransferClient(accessToken);
    const stream = fs.createReadStream(filePath);
    const object = await client.uploadObjectStream(bucketKey, objectKey, stream);
    return object;
}

if (process.argv.length < 6) {
    console.log('Usage:');
    console.log('node ' + __filename + ' <path to local file> <bucket key> <object key> <access token>');
    process.exit(0);
}

uploadStream(process.argv[2], process.argv[3], process.argv[4], process.argv[5])
    .then(obj => console.log(obj))
    .catch(err => console.error(err));

And in case you would like to upload a local file to OSS bucket (loading the entire file into memory first).

const fs = require('fs');
const { BinaryTransferClient } = require('..');

async function uploadBuffer(filePath, bucketKey, objectKey, accessToken) {
    const client = new BinaryTransferClient(accessToken);
    const buffer = fs.readFileSync(filePath);
    const object = await client.uploadObject(bucketKey, objectKey, buffer);
    return object;
}

if (process.argv.length < 6) {
    console.log('Usage:');
    console.log('node ' + __filename + ' <path to local file> <bucket key> <object key> <access token>');
    process.exit(0);
}

uploadBuffer(process.argv[2], process.argv[3], process.argv[4], process.argv[5])
    .then(obj => console.log(obj))
    .catch(err => console.error(err));

And let's not forget about uploading a local file to a Data Management hub (such as BIM 360, Fusion Teams, or ACC)

const fs = require('fs');
const path = require('path');
const { ProjectsApi, FoldersApi, ItemsApi, VersionsApi } = require('forge-apis');
const { BinaryTransferClient } = require('..');

async function getFolderContents(projectId, folderId, getAccessToken) {
    const resp = await new FoldersApi().getFolderContents(projectId, folderId, {}, null, getAccessToken());
    return resp.body.data;
}

async function createStorage(projectId, folderId, displayName, getAccessToken) {
    const body = {
        jsonapi: {
            version: '1.0'
        },
        data: {
            type: 'objects',
            attributes: {
                name: displayName
            },
            relationships: {
                target: {
                    data: {
                        type: 'folders',
                        id: folderId
                    }
                }
            }
        }
    };
    const resp = await new ProjectsApi().postStorage(projectId, body, null, getAccessToken());
    return resp.body.data;
}

async function createItem(projectId, folderId, objectId, displayName, getAccessToken) {
    const body = {
        jsonapi: {
            version: '1.0'
        },
        data: {
            type: 'items',
            attributes: {
                displayName,
                extension: {
                    type: 'items:autodesk.core:File',
                    version: '1.0'
                }
            },
            relationships: {
                tip: {
                    data: {
                        type: 'versions',
                        id: '1'
                    }
                },
                parent: {
                    data: {
                        type: 'folders',
                        id: folderId
                    }
                }
            }
        },
        included: [
            {
                type: 'versions',
                id: '1',
                attributes: {
                    name: displayName,
                    extension: {
                        type: 'versions:autodesk.core:File',
                        version: '1.0'
                    }
                },
                relationships: {
                    storage: {
                        data: {
                            type: 'objects',
                            id: objectId
                        }
                    }
                }
            }
        ]
    };
    const resp = await new ItemsApi().postItem(projectId, body, null, getAccessToken());
    return resp.body.data;
}

async function createVersion(projectId, lineageId, objectId, displayName, getAccessToken) {
    const body = {
        jsonapi: {
            version: '1.0'
        },
        data: {
            type: 'versions',
            attributes: {
                name: displayName,
                extension: {
                    type: 'versions:autodesk.core:File',
                    version: '1.0'
                }
            },
            relationships: {
                item: {
                    data: {
                        type: 'items',
                        id: lineageId
                    }
                },
                storage: {
                    data: {
                        type: 'objects',
                        id: objectId
                    }
                }
            }
        }
    };
    const resp = await new VersionsApi().postVersion(projectId, body, null, getAccessToken());
    return resp.body.data;
}

async function upload(filePath, projectId, folderId, accessToken) {
    const displayName = path.basename(filePath);
    const getAccessToken = () => {
        return { access_token: accessToken };
    };

    console.log('Creating storage...');
    const storage = await createStorage(projectId, folderId, displayName, getAccessToken);
    console.log(storage);
    const match = /urn:adsk.objects:os.object:([^\/]+)\/(.+)/.exec(storage.id);
    if (!match || match.length < 3) {
        throw new Error('Unexpected storage ID', storage.id);
    }
    const bucketKey = match[1];
    const objectKey = match[2];

    console.log('Uploading file...');
    const client = new BinaryTransferClient(accessToken);
    const object = await client.uploadObject(bucketKey, objectKey, fs.readFileSync(filePath));
    console.log(object);

    console.log('Checking if file already exists...');
    const contents = await getFolderContents(projectId, folderId, getAccessToken);
    const item = contents.find(e => e.type === 'items' && e.attributes.displayName === displayName);

    if (!item) {
        console.log('Creating new item...');
        const lineage = await createItem(projectId, folderId, object.objectId, displayName, getAccessToken);
        console.log(lineage);
    } else {
        console.log('Creating new item version...');
        const version = await createVersion(projectId, item.id, object.objectId, displayName, getAccessToken);
        console.log(version);
    }
}

if (process.argv.length < 6) {
    console.log('Usage:');
    console.log('node ' + __filename + ' <path to local file> <project id> <folder id> <access token>');
    process.exit(0);
}

upload(process.argv[2], process.argv[3], process.argv[4], process.argv[5])
    .then(obj => console.log('Done!'))
    .catch(err => console.error(err));

Thank you again for all your business and please feel free to reach out to us through forge.help@autodesk.com

Related Article