Skip to content

Commit

Permalink
Merge pull request #4 from Banou26/refactor
Browse files Browse the repository at this point in the history
Refactor
  • Loading branch information
Banou26 authored Jan 30, 2024
2 parents f24f413 + 4cfa8f5 commit 8b4b701
Show file tree
Hide file tree
Showing 9 changed files with 570 additions and 524 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ dist/libav-wasm.js:
-s INITIAL_MEMORY=150mb \
-s TOTAL_MEMORY=125mb \
-s STACK_SIZE=50mb \
-s ALLOW_MEMORY_GROWTH=1 \
-s ASYNCIFY \
-s MODULARIZE=1 \
-g \
-gsource-map \
--source-map-base http://localhost:1234/dist/ \
-s ASSERTIONS=2 \
-lavcodec -lavformat -lavfilter -lavdevice -lswresample -lswscale -lavutil -lm -lx264 \
-o dist/libav.js \
Expand Down
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
"types": "build/index.d.ts",
"type": "module",
"scripts": {
"dev": "npm i ./dist && concurrently \"npm run dev-wasm\" \"npm run dev-web\"",
"dev": "npm i ./dist && concurrently \"npm run dev-wasm\" \"npm run dev-web\" \"npm run worker-build-dev\"",
"dev-wasm": "nodemon -e cpp --exec \"npm run make-docker\"",
"build-worker": "vite build --config vite-worker.config.ts",
"make-docker": "docker-compose run libav-wasm make",
"dev-web": "vite --port 1234",
"vite-build": "vite build",
"worker-build-dev": "nodemon -e ts --watch src/worker --exec \"npm run build-worker\"",
"build": "npm run copy-package && npm run make-docker && npm i ./dist && vite build && npm run build-worker && npm run types && npm run copy-wasm && npm remove libav",
"types": "tsc",
"copy-package": "copyfiles -u 2 ./src/build-config/package.json dist",
Expand All @@ -24,7 +25,7 @@
"@rollup/plugin-commonjs": "^24.0.1",
"concurrently": "^7.6.0",
"copyfiles": "^2.4.1",
"nodemon": "^2.0.20",
"nodemon": "^2.0.22",
"shx": "^0.3.4",
"typescript": "^4.9.4",
"vite": "^4.0.4"
Expand Down
163 changes: 84 additions & 79 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,15 @@
import type { Resolvers as WorkerResolvers } from './worker'

import PQueue from 'p-queue'
import { call } from 'osra'

import { SEEK_FLAG, SEEK_WHENCE_FLAG } from './utils'

export {
SEEK_FLAG,
SEEK_WHENCE_FLAG
}

export type MakeTransmuxerOptions = {
/** Path that will be used to locate the .wasm file imported from the worker */
publicPath: string
/** Path that will be used to locate the javascript worker file */
workerUrl: string
workerOptions?: WorkerOptions
read: (offset: number, size: number) => Promise<ArrayBuffer>
seek: (currentOffset: number, offset: number, whence: SEEK_WHENCE_FLAG) => Promise<number>
randomRead: (offset: number, size: number) => Promise<ArrayBuffer>
getStream: (offset: number) => Promise<ReadableStream<Uint8Array>>
subtitle: (title: string, language: string, data: string) => Promise<void> | void
attachment: (filename: string, mimetype: string, buffer: ArrayBuffer) => Promise<void> | void
write: (params: {
Expand Down Expand Up @@ -77,8 +69,8 @@ export const makeTransmuxer = async ({
publicPath,
workerUrl,
workerOptions,
read: _read,
seek: _seek,
randomRead: _randomRead,
getStream: _getStream,
write: _write,
attachment,
subtitle: _subtitle,
Expand All @@ -99,15 +91,16 @@ export const makeTransmuxer = async ({

const target = call<WorkerResolvers>(worker)

const apiQueue = new PQueue()

const addTask = <T extends (...args: any) => any>(func: T) =>
apiQueue.add<Awaited<ReturnType<T>>>(func)

const subtitles = new Map<number, Subtitle>()
let lastChunk: Chunk | undefined

const { init: _workerInit, destroy: _workerDestroy, process: _workerProcess, seek: _workerSeek, getInfo: _getInfo } =
let currentStream: ReadableStream<Uint8Array> | undefined
let reader: ReadableStreamDefaultReader<Uint8Array> | undefined

let streamResultPromiseResolve: (value: { value: ArrayBuffer | undefined, done: boolean, cancelled: boolean }) => void
let streamResultPromiseReject: (reason?: any) => void
let streamResultPromise: Promise<{ value: ArrayBuffer | undefined, done: boolean, cancelled: boolean }>

const { init: workerInit, destroy: workerDestroy, read: workerRead, seek: workerSeek, getInfo: getInfo } =
await target(
'init',
{
Expand Down Expand Up @@ -155,82 +148,94 @@ export const makeTransmuxer = async ({
_subtitle(subtitle.title, subtitle.language, subtitleString)
},
attachment: async (filename, mimetype, buffer) => attachment(filename, mimetype, buffer),
read: (offset, bufferSize) => _read(offset, bufferSize),
seek: (currentOffset, offset, whence) => _seek(currentOffset, offset, whence),
write: async ({
randomRead: (offset, bufferSize) => _randomRead(offset, bufferSize),
streamRead: async (offset: number) => {
if (!currentStream) {
currentStream = await _getStream(offset)
reader = currentStream.getReader()
}

streamResultPromise = new Promise<{ value: ArrayBuffer | undefined, done: boolean, cancelled: boolean }>((resolve, reject) => {
streamResultPromiseResolve = resolve
streamResultPromiseReject = reject
})

const tryReading = (): Promise<void> | undefined =>
reader
?.read()
.then(result => ({
value: result.value?.buffer,
done: result.value === undefined,
cancelled: false
}))
.then(async (result) => {
if (result.done) {
reader?.cancel()
if (offset >= length) {
return streamResultPromiseResolve(result)
}
currentStream = await _getStream(offset)
reader = currentStream.getReader()
return tryReading()
}

return streamResultPromiseResolve(result)
})
.catch((err) => streamResultPromiseReject(err))

tryReading()

return (
streamResultPromise
.then((value) => ({
value: value.value,
done: value.done,
cancelled: false
}))
.catch(err => {
console.error(err)
return {
value: undefined,
done: false,
cancelled: true
}
})
)
},
clearStream: async () => {
reader?.cancel()
currentStream = undefined
reader = undefined
},
write: ({
isHeader,
offset,
arrayBuffer,
position,
pts,
duration
}) => {
const chunk = {
isHeader,
offset,
buffer: new Uint8Array(arrayBuffer),
pts,
duration,
pos: position
}

if (!isHeader) {
lastChunk = chunk
processBufferChunks.push(chunk)
}

await _write(chunk)
}
}) => _write({
isHeader,
offset,
buffer: new Uint8Array(arrayBuffer),
pts,
duration,
pos: position
})
}
)

const workerQueue = new PQueue({ concurrency: 1 })

const addWorkerTask = <T extends (...args: any[]) => any>(func: T) =>
(...args: Parameters<T>) =>
workerQueue.add<Awaited<ReturnType<T>>>(() => func(...args))

const workerInit = addWorkerTask(_workerInit)
const workerDestroy = addWorkerTask(_workerDestroy)
const workerProcess = addWorkerTask(_workerProcess)
const workerSeek = addWorkerTask(_workerSeek)
const getInfo = addWorkerTask(_getInfo)

let processBufferChunks: Chunk[] = []

const result = {
init: () => addTask(async () => {
processBufferChunks = []
await workerInit()
}),
init: () => workerInit(),
destroy: (destroyWorker = false) => {
if (destroyWorker) {
worker.terminate()
return
}
return addTask(() => workerDestroy())
},
process: (timeToProcess: number) => addTask(async () => {
processBufferChunks = []
await workerProcess(timeToProcess)
const writtenChunks = processBufferChunks
processBufferChunks = []
return writtenChunks
}),
seek: (time: number) => {
return addTask(async () => {
// if (lastChunk && (lastChunk.pts > time)) {
// await workerDestroy()
// processBufferChunks = []
// await workerInit()
// }
processBufferChunks = []
await workerSeek(
Math.max(0, time) * 1000,
SEEK_FLAG.NONE
)
})
return workerDestroy()
},
read: () => workerRead(),
seek: (time: number) => workerSeek(Math.max(0, time) * 1000),
getInfo: () => getInfo() as Promise<{ input: MediaInfo, output: MediaInfo }>
}

Expand Down
Loading

0 comments on commit 8b4b701

Please sign in to comment.