Server IP : 162.213.251.212 / Your IP : 3.17.167.39 [ Web Server : LiteSpeed System : Linux business55.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64 User : allssztx ( 535) PHP Version : 8.1.31 Disable Function : NONE Domains : 1 Domains MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /home/allssztx/nodevenv/needapair.com/20/lib/node_modules/mongodb/src/gridfs/ |
Upload File : |
import { Writable } from 'stream'; import { type Document, ObjectId } from '../bson'; import type { Collection } from '../collection'; import { MongoAPIError, MONGODB_ERROR_CODES, MongoError } from '../error'; import { type Callback, squashError } from '../utils'; import type { WriteConcernOptions } from '../write_concern'; import { WriteConcern } from './../write_concern'; import type { GridFSFile } from './download'; import type { GridFSBucket } from './index'; /** @public */ export interface GridFSChunk { _id: ObjectId; files_id: ObjectId; n: number; data: Buffer | Uint8Array; } /** @public */ export interface GridFSBucketWriteStreamOptions extends WriteConcernOptions { /** Overwrite this bucket's chunkSizeBytes for this file */ chunkSizeBytes?: number; /** Custom file id for the GridFS file. */ id?: ObjectId; /** Object to store in the file document's `metadata` field */ metadata?: Document; /** * String to store in the file document's `contentType` field. * @deprecated Will be removed in the next major version. Add a contentType field to the metadata document instead. */ contentType?: string; /** * Array of strings to store in the file document's `aliases` field. * @deprecated Will be removed in the next major version. Add an aliases field to the metadata document instead. */ aliases?: string[]; /** @internal TODO(NODE-5688): make this public */ timeoutMS?: number; } /** * A writable stream that enables you to write buffers to GridFS. * * Do not instantiate this class directly. Use `openUploadStream()` instead. * @public */ export class GridFSBucketWriteStream extends Writable { bucket: GridFSBucket; /** A Collection instance where the file's chunks are stored */ chunks: Collection<GridFSChunk>; /** A Collection instance where the file's GridFSFile document is stored */ files: Collection<GridFSFile>; /** The name of the file */ filename: string; /** Options controlling the metadata inserted along with the file */ options: GridFSBucketWriteStreamOptions; /** Indicates the stream is finished uploading */ done: boolean; /** The ObjectId used for the `_id` field on the GridFSFile document */ id: ObjectId; /** The number of bytes that each chunk will be limited to */ chunkSizeBytes: number; /** Space used to store a chunk currently being inserted */ bufToStore: Buffer; /** Accumulates the number of bytes inserted as the stream uploads chunks */ length: number; /** Accumulates the number of chunks inserted as the stream uploads file contents */ n: number; /** Tracks the current offset into the buffered bytes being uploaded */ pos: number; /** Contains a number of properties indicating the current state of the stream */ state: { /** If set the stream has ended */ streamEnd: boolean; /** Indicates the number of chunks that still need to be inserted to exhaust the current buffered data */ outstandingRequests: number; /** If set an error occurred during insertion */ errored: boolean; /** If set the stream was intentionally aborted */ aborted: boolean; }; /** The write concern setting to be used with every insert operation */ writeConcern?: WriteConcern; /** * The document containing information about the inserted file. * This property is defined _after_ the finish event has been emitted. * It will remain `null` if an error occurs. * * @example * ```ts * fs.createReadStream('file.txt') * .pipe(bucket.openUploadStream('file.txt')) * .on('finish', function () { * console.log(this.gridFSFile) * }) * ``` */ gridFSFile: GridFSFile | null = null; /** * @param bucket - Handle for this stream's corresponding bucket * @param filename - The value of the 'filename' key in the files doc * @param options - Optional settings. * @internal */ constructor(bucket: GridFSBucket, filename: string, options?: GridFSBucketWriteStreamOptions) { super(); options = options ?? {}; this.bucket = bucket; this.chunks = bucket.s._chunksCollection; this.filename = filename; this.files = bucket.s._filesCollection; this.options = options; this.writeConcern = WriteConcern.fromOptions(options) || bucket.s.options.writeConcern; // Signals the write is all done this.done = false; this.id = options.id ? options.id : new ObjectId(); // properly inherit the default chunksize from parent this.chunkSizeBytes = options.chunkSizeBytes || this.bucket.s.options.chunkSizeBytes; this.bufToStore = Buffer.alloc(this.chunkSizeBytes); this.length = 0; this.n = 0; this.pos = 0; this.state = { streamEnd: false, outstandingRequests: 0, errored: false, aborted: false }; if (!this.bucket.s.calledOpenUploadStream) { this.bucket.s.calledOpenUploadStream = true; checkIndexes(this).then(() => { this.bucket.s.checkedIndexes = true; this.bucket.emit('index'); }, squashError); } } /** * @internal * * The stream is considered constructed when the indexes are done being created */ override _construct(callback: (error?: Error | null) => void): void { if (this.bucket.s.checkedIndexes) { return process.nextTick(callback); } this.bucket.once('index', callback); } /** * @internal * Write a buffer to the stream. * * @param chunk - Buffer to write * @param encoding - Optional encoding for the buffer * @param callback - Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush. */ override _write( chunk: Buffer | string, encoding: BufferEncoding, callback: Callback<void> ): void { doWrite(this, chunk, encoding, callback); } /** @internal */ override _final(callback: (error?: Error | null) => void): void { if (this.state.streamEnd) { return process.nextTick(callback); } this.state.streamEnd = true; writeRemnant(this, callback); } /** * Places this write stream into an aborted state (all future writes fail) * and deletes all chunks that have already been written. */ async abort(): Promise<void> { if (this.state.streamEnd) { // TODO(NODE-3485): Replace with MongoGridFSStreamClosed throw new MongoAPIError('Cannot abort a stream that has already completed'); } if (this.state.aborted) { // TODO(NODE-3485): Replace with MongoGridFSStreamClosed throw new MongoAPIError('Cannot call abort() on a stream twice'); } this.state.aborted = true; await this.chunks.deleteMany({ files_id: this.id }); } } function handleError(stream: GridFSBucketWriteStream, error: Error, callback: Callback): void { if (stream.state.errored) { process.nextTick(callback); return; } stream.state.errored = true; process.nextTick(callback, error); } function createChunkDoc(filesId: ObjectId, n: number, data: Buffer): GridFSChunk { return { _id: new ObjectId(), files_id: filesId, n, data }; } async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise<void> { const index = { files_id: 1, n: 1 }; let indexes; try { indexes = await stream.chunks.listIndexes().toArray(); } catch (error) { if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) { indexes = []; } else { throw error; } } const hasChunksIndex = !!indexes.find(index => { const keys = Object.keys(index.key); if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) { return true; } return false; }); if (!hasChunksIndex) { await stream.chunks.createIndex(index, { ...stream.writeConcern, background: true, unique: true }); } } function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void { if (stream.done) { return process.nextTick(callback); } if (stream.state.streamEnd && stream.state.outstandingRequests === 0 && !stream.state.errored) { // Set done so we do not trigger duplicate createFilesDoc stream.done = true; // Create a new files doc const gridFSFile = createFilesDoc( stream.id, stream.length, stream.chunkSizeBytes, stream.filename, stream.options.contentType, stream.options.aliases, stream.options.metadata ); if (isAborted(stream, callback)) { return; } stream.files.insertOne(gridFSFile, { writeConcern: stream.writeConcern }).then( () => { stream.gridFSFile = gridFSFile; callback(); }, error => handleError(stream, error, callback) ); return; } process.nextTick(callback); } async function checkIndexes(stream: GridFSBucketWriteStream): Promise<void> { const doc = await stream.files.findOne({}, { projection: { _id: 1 } }); if (doc != null) { // If at least one document exists assume the collection has the required index return; } const index = { filename: 1, uploadDate: 1 }; let indexes; try { indexes = await stream.files.listIndexes().toArray(); } catch (error) { if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) { indexes = []; } else { throw error; } } const hasFileIndex = !!indexes.find(index => { const keys = Object.keys(index.key); if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) { return true; } return false; }); if (!hasFileIndex) { await stream.files.createIndex(index, { background: false }); } await checkChunksIndex(stream); } function createFilesDoc( _id: ObjectId, length: number, chunkSize: number, filename: string, contentType?: string, aliases?: string[], metadata?: Document ): GridFSFile { const ret: GridFSFile = { _id, length, chunkSize, uploadDate: new Date(), filename }; if (contentType) { ret.contentType = contentType; } if (aliases) { ret.aliases = aliases; } if (metadata) { ret.metadata = metadata; } return ret; } function doWrite( stream: GridFSBucketWriteStream, chunk: Buffer | string, encoding: BufferEncoding, callback: Callback<void> ): void { if (isAborted(stream, callback)) { return; } const inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding); stream.length += inputBuf.length; // Input is small enough to fit in our buffer if (stream.pos + inputBuf.length < stream.chunkSizeBytes) { inputBuf.copy(stream.bufToStore, stream.pos); stream.pos += inputBuf.length; process.nextTick(callback); return; } // Otherwise, buffer is too big for current chunk, so we need to flush // to MongoDB. let inputBufRemaining = inputBuf.length; let spaceRemaining: number = stream.chunkSizeBytes - stream.pos; let numToCopy = Math.min(spaceRemaining, inputBuf.length); let outstandingRequests = 0; while (inputBufRemaining > 0) { const inputBufPos = inputBuf.length - inputBufRemaining; inputBuf.copy(stream.bufToStore, stream.pos, inputBufPos, inputBufPos + numToCopy); stream.pos += numToCopy; spaceRemaining -= numToCopy; let doc: GridFSChunk; if (spaceRemaining === 0) { doc = createChunkDoc(stream.id, stream.n, Buffer.from(stream.bufToStore)); ++stream.state.outstandingRequests; ++outstandingRequests; if (isAborted(stream, callback)) { return; } stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then( () => { --stream.state.outstandingRequests; --outstandingRequests; if (!outstandingRequests) { checkDone(stream, callback); } }, error => handleError(stream, error, callback) ); spaceRemaining = stream.chunkSizeBytes; stream.pos = 0; ++stream.n; } inputBufRemaining -= numToCopy; numToCopy = Math.min(spaceRemaining, inputBufRemaining); } } function writeRemnant(stream: GridFSBucketWriteStream, callback: Callback): void { // Buffer is empty, so don't bother to insert if (stream.pos === 0) { return checkDone(stream, callback); } ++stream.state.outstandingRequests; // Create a new buffer to make sure the buffer isn't bigger than it needs // to be. const remnant = Buffer.alloc(stream.pos); stream.bufToStore.copy(remnant, 0, 0, stream.pos); const doc = createChunkDoc(stream.id, stream.n, remnant); // If the stream was aborted, do not write remnant if (isAborted(stream, callback)) { return; } stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then( () => { --stream.state.outstandingRequests; checkDone(stream, callback); }, error => handleError(stream, error, callback) ); } function isAborted(stream: GridFSBucketWriteStream, callback: Callback<void>): boolean { if (stream.state.aborted) { process.nextTick(callback, new MongoAPIError('Stream has been aborted')); return true; } return false; }