using concurrent::Actor
using concurrent::ActorPool
using concurrent::AtomicRef
using concurrent::AtomicBool
using afConcurrent::Synchronized
using afConcurrent::SynchronizedState
** Utility class that attaches to the drone's 'onVideoFrame()' event.
** Example usage:
**
** syntax: fantom
** VideoStreamer.toMp4File(`droneStunts.mp4`).attachToLiveStream(drone)
**
** Note some methods start and call out to an external [FFmpeg]`https://ffmpeg.org/` process for
** video processing. Whilst this works for the limited functionality of these methods, this class
** is not meant to an all-encompassing FFmpeg wrapper.
**
** For ease of use, just put 'ffmpeg' in the same directory as your program.
const class VideoStreamer {
// FIXME move all data to sync state to prevent race conditions
private const SynchronizedState mutex
private const AtomicRef pngImageRef := AtomicRef(null)
private const AtomicRef onPngImageRef := AtomicRef(null)
private const AtomicRef droneRef := AtomicRef(null)
private const AtomicRef oldVideoFrameHookRef := AtomicRef(null)
private const AtomicRef oldDisconnectHookRef := AtomicRef(null)
private const AtomicBool attachedRef := AtomicBool()
private const AtomicBool recordStreamRef := AtomicBool()
private const Synchronized? pngEventThread
** The 'onVideoFrame()' listener for this streamer, should you wish to attach / call it yourself
const |Buf, PaveHeader, Drone| onVideoFrameListener := #onVideoFrame.func.bind([this])
** The 'onDisconnect()' listener for this streamer, should you wish to attach / call it yourself
const |Bool, Drone| onDisconnectListener := #onDisconnect.func.bind([this])
** The output file that the video is saved to.
**
** Only available if this 'VideoSteamer' instance was initialised via [toRawFile()]`VideoStreamer.toRawFile` or
** [toMp4File()]`VideoStreamer.toMp4File`.
const File? file
** Returns the latest PNG image from the video stream.
**
** Only available if this 'VideoSteamer' instance was initialised via [toPngImages()]`VideoStreamer.toPngImages`.
Buf? pngImage() {
pngImageRef.val
}
** Event hook that's called when a new PNG image becomes available.
**
** Only available if this 'VideoSteamer' instance was initialised via [toPngImages()]`VideoStreamer.toPngImages`.
**
** Throws 'NotImmutableErr' if the function is not immutable.
** Note this hook is called from a different Actor / thread to the one that sets it.
|Buf|? onPngImage {
get { onPngImageRef.val }
set { onPngImageRef.val = it?.toImmutable }
}
private new make(|This| f) { f(this) }
** Saves raw video frames to the given file. Example:
**
** syntax: fantom
** VideoStreamer.toRawFile(`droneStunts.h264`).attachToLiveStream(drone)
**
** Note the video consists of raw H.264 codec frames and is not readily readable by anything
** (with the exception of VLC, try saving the file with a '.h264' extension.)
**
** To wrap the file in a more usable '.mp4' container, try the following:
**
** C:\> ffmpeg -f h264 -i droneStunts.h264 -f mp4 -codec:v copy droneStunts.mp4
**
** The input URI is given a unique numerical suffix to prevent file from being overwritten.
** Example, 'drone.h264' may become 'drone-001F.h264'. See the [file]`#file` field to acquire the
** actual file created.
**
** Available options:
** pre>
** actorPool : (ActorPool) the ActorPool to use for thread processing
** Defaults to a new instance.
** <pre
static VideoStreamer toRawFile(Uri outputFile, [Str:Obj]? options := null) {
if (outputFile.isDir)
throw ArgErr("Output file is a directory: ${outputFile}")
actorPool := options?.get("actorPool") ?: ActorPool()
return VideoStreamer {
it.file = addFileSuffix(outputFile)
itFile := it.file
it.mutex = SynchronizedState(actorPool) |->Obj| {
VideoStreamerToH264File(itFile, false)
}
}
}
** Saves the video stream as the given MP4 file. Example:
**
** syntax: fantom
** VideoStreamer.toMp4File(`droneStunts.mp4`).attachToLiveStream(drone)
**
** Note this method starts an external FFmpeg process and pipes the raw video frames to it.
** FFmpeg then wraps the video in an MP4 container and writes the file.
**
** The input URI is given a unique numerical suffix to prevent file from being overwritten.
** Example, 'drone.mp4' may become 'drone-001F.mp4'. See the `file` field to acquire the
** actual file created.
**
** Available options:
** pre>
** ffmpegPath : (Uri) the location of the FFmpeg executable.
** Defaults to `ffmpeg.exe` on Windows, 'ffmpeg' otherwise.
**
** ffmpegArgs : (Str[]) an array of arguments for FFmpeg.
** Defaults to "-f h264 -i - -f mp4 -codec:v copy -movflags +faststart".split
**
** quiet : (Bool) if true, then FFmpeg console output is suppressed.
** Defaults to false.
**
** actorPool : (ActorPool) the ActorPool to use for thread processing
** Defaults to a new instance.
** <pre
**
** If no 'ffmpegPath' is given then FFmpeg is assumed to be in the current directory.
static VideoStreamer toMp4File(Uri outputFile, [Str:Obj]? options := null) {
if (outputFile.isDir)
throw ArgErr("Output file is a directory: ${outputFile}")
actorPool := options?.get("actorPool") ?: ActorPool() { it.name = typeof.name }
ffmpegPath := options?.get("ffmpegPath") ?: (Env.cur.os == "win32" ? `ffmpeg.exe` : `ffmpeg`)
ffmpegArgs := options?.get("ffmpegArgs") ?: "-f h264 -i - -f mp4 -codec:v copy -movflags +faststart".split
quiet := options?.get("quiet") ?: false
throttle := options?.get("throttle") ?: 200ms // 5 times a second should be fine for console output
return VideoStreamer {
itFile := addFileSuffix(outputFile)
it.file = itFile
it.mutex = SynchronizedState(actorPool) |->Obj| {
VideoStreamerToMp4File(ffmpegPath, ffmpegArgs, itFile, actorPool, throttle, quiet)
}
}
}
** Converts the video stream into a stream of PNG images. Example:
**
** syntax: fantom
** vs := VideoStreamer.toPngImages.attachToLiveStream(drone)
** vs.onPngImage = |Bug pngBuf| {
** echo("Got new image of size ${pngBuf.size}")
** }
**
** The `pngImage` field always contains the latest PNG image data.
**
** Use the [onPngImage()]`onPngImage` hook to be notified of new images.
**
** Note this method starts an external FFmpeg process and pipes the raw video frames to it.
** FFmpeg converts the video to PNG images and pipes it back to the 'VideoStreamer' instance.
**
** Available options:
** pre>
** ffmpegPath : (Uri) the location of the FFmpeg executable.
** Defaults to `ffmpeg.exe` on Windows, 'ffmpeg' otherwise.
**
** ffmpegArgs : (Str[]) an array of arguments for FFmpeg.
** Defaults to "-f h264 -i - -f image2pipe -codec:v png -".split
**
** frameRate : (Int?) The frame rate FFmpeg should enforce.
** Defaults to null - same as video input.
**
** quiet : (Bool) if true, then FFmpeg console output is suppressed.
**
** actorPool : (ActorPool) the ActorPool to use for thread processing
** Defaults to a new instance.
** <pre
**
** If no 'ffmpegPath' is given then FFmpeg is assumed to be in the current directory.
static VideoStreamer toPngImages([Str:Obj]? options := null) {
actorPool := options?.get("actorPool") ?: ActorPool()
ffmpegPath := options?.get("ffmpegPath") ?: (Env.cur.os == "win32" ? `ffmpeg.exe` : `ffmpeg`)
ffmpegArgs := options?.get("ffmpegArgs") ?: "-f h264 -i - -f image2pipe -codec:v png -".split
frameRate := options?.get("frameRate")
quiet := options?.get("quiet") ?: false
throttle := options?.get("throttle") ?: 200ms // 5 times a second should be fine for console output
args := (Str[]) ffmpegArgs
if (frameRate != null) {
args = args.rw
args.pop
args.push("-r")
args.push(frameRate)
args.push("-")
}
iArgs := args.toImmutable
return VideoStreamer {
vs := it
it.pngEventThread = Synchronized(actorPool)
it.mutex = SynchronizedState(actorPool) |->Obj| {
VideoStreamerToPngEvents(ffmpegPath, iArgs, actorPool, throttle, quiet, #onNewPngImage.func.bind([vs]))
}
}
}
** Attaches this instance to the given drone and starts stream processing.
**
** This methods sets new 'onVideoFrame' and 'onDisconnect' hooks on the drone, but wraps any
** existing hooks. Meaning any hooks that were set previously, still get called.
This attachToLiveStream(Drone drone) {
if (droneRef.val != null)
throw Err("Already attached to ${droneRef.val}")
droneRef.val = drone
recordStreamRef.val = false
oldVideoFrameHookRef.val = drone.onVideoFrame
drone.onVideoFrame = |Buf payload, PaveHeader pave, Drone dron| {
onVideoFrameListener(payload, pave, dron)
((Func?) oldVideoFrameHookRef.val)?.call(payload, pave, dron)
}
oldDisconnectHookRef.val = drone.onDisconnect
drone.onDisconnect = |Bool abnormal, Drone dron| {
onDisconnectListener(abnormal, dron)
((Func?) oldDisconnectHookRef.val)?.call(abnormal, dron)
}
// fire it up! Sync so we can receive any ctor errors
mutex.sync |->| { }
attachedRef.val = true
return this
}
** Attaches this instance to the given drone and starts stream processing.
**
** This methods sets new 'onRecordFrame' and 'onDisconnect' hooks on the drone, but wraps any
** existing hooks. Meaning any hooks that were set previously, still get called.
**
** Ensure you call 'Drone.startRecording()' before calling this.
@NoDoc
This attachToRecordStream(Drone drone) {
if (droneRef.val != null)
throw Err("Already attached to ${droneRef.val}")
droneRef.val = drone
recordStreamRef.val = true
oldVideoFrameHookRef.val = drone.onRecordFrame
drone.onRecordFrame = |Buf payload, PaveHeader pave, Drone dron| {
onVideoFrameListener(payload, pave, dron)
((Func?) oldVideoFrameHookRef.val)?.call(payload, pave, dron)
}
oldDisconnectHookRef.val = drone.onDisconnect
drone.onDisconnect = |Bool abnormal, Drone dron| {
onDisconnectListener(abnormal, dron)
((Func?) oldDisconnectHookRef.val)?.call(abnormal, dron)
}
// fire it up! Sync so we can receive any ctor errors
mutex.sync |->| { }
attachedRef.val = true
return this
}
** Reverts the 'onVideoFrame' and 'onDisconnect' hooks to what they were before 'attachTo()'
** was called.
** Closes files and halts stream processing.
Void detach() {
onDisconnect(false)
}
private Void onVideoFrame(Buf payload, PaveHeader pave) {
if (!mutex.lock.actor.pool.isStopped)
mutex.withState |VideoStreamerImpl state| {
state.onVideoFrame(payload, pave)
}
}
private Void onDisconnect(Bool abnormal) {
if (attachedRef.val.not) return
if (!mutex.lock.actor.pool.isStopped)
mutex.getState |VideoStreamerImpl state| {
state.onDisconnect(abnormal)
}
// revert the drone hooks
drone := (Drone) droneRef.val
if (recordStreamRef.val)
drone.onRecordFrame = oldVideoFrameHookRef.val
else
drone.onVideoFrame = oldVideoFrameHookRef.val
drone.onDisconnect = oldDisconnectHookRef.val
// only stop *our* ActorPools
if (mutex.lock.actor.pool.name == typeof.name)
mutex.lock.actor.pool.stop
}
private Void onNewPngImage(Buf buf) {
pngImageRef.val = buf
pngEventThread.async |->| {
callSafe(onPngImage, [buf])
}
}
private File addFileSuffix(Uri fileLocation) {
origFile := fileLocation.toFile.normalize
index := 0
file := null as File
while (file == null || file.exists) {
file = origFile.parent + (`${origFile.basename}-${index.toHex(4).upper}.${origFile.ext}`)
index++
}
return file
}
private Void callSafe(Func? f, Obj[]? args) {
try f?.callList(args)
catch (Err err)
err.trace
}
}
internal mixin VideoStreamerImpl {
abstract Void onVideoFrame(Buf payload, PaveHeader pave)
abstract Void onDisconnect(Bool abnormal)
}
internal class VideoStreamerToH264File : VideoStreamerImpl {
private static const Log log := VideoStreamer#.pod.log
OutStream out
new make(File file, Bool append) {
this.out = file.out(append)
log.info("Streaming video to ${file.normalize.osPath}")
}
override Void onVideoFrame(Buf payload, PaveHeader pave) {
out.writeBuf(payload).flush
}
override Void onDisconnect(Bool abnormal) {
out.flush.close
}
}
internal class VideoStreamerToMp4File : VideoStreamerImpl {
private static const Log log := VideoStreamer#.pod.log
Process2 ffmpegProcess
new make(Uri ffmpegPath, Str[] ffmpegArgs, File output, ActorPool actorPool, Duration throttle, Bool quiet) {
ffmpegFile := ffmpegPath.toFile.normalize
if (!ffmpegFile.exists)
throw IOErr("${ffmpegFile.osPath} does not exist")
// kick off an ffmpeg process
ffmpegArgs = ffmpegArgs.rw
ffmpegArgs.insert(0, ffmpegFile.osPath)
ffmpegArgs.add(output.normalize.osPath)
ffmpegProcess = Process2(ffmpegArgs) {
it.actorPool = actorPool
it.mergeErr = false
it.env.clear
}.run
ffmpegProcess.pipeFromStdOut(quiet ? null : Env.cur.out, throttle)
ffmpegProcess.pipeFromStdErr(quiet ? null : Env.cur.err, throttle)
log.info("Streaming video to ${output.normalize.osPath}")
}
override Void onVideoFrame(Buf payload, PaveHeader pave) {
if (ffmpegProcess.isRunning)
ffmpegProcess.stdIn.writeBuf(payload).flush
}
override Void onDisconnect(Bool abnormal) {
ffmpegProcess.kill
}
}
internal class VideoStreamerToPngEvents : VideoStreamerImpl {
private static const Log log := VideoStreamer#.pod.log
Process2 ffmpegProcess
|Buf| onNewPngImage
new make(Uri ffmpegPath, Str[] ffmpegArgs, ActorPool actorPool, Duration throttle, Bool quiet, |Buf| onNewPngImage) {
this.onNewPngImage = onNewPngImage
ffmpegFile := ffmpegPath.toFile.normalize
if (!ffmpegFile.exists)
throw IOErr("${ffmpegFile.osPath} does not exist")
// kick off an ffmpeg process
ffmpegArgs = ffmpegArgs.rw
ffmpegArgs.insert(0, ffmpegFile.osPath)
ffmpegProcess = Process2(ffmpegArgs) {
it.actorPool = actorPool
it.mergeErr = false
it.env.clear
}.run
ffmpegProcess.pipeFromStdErr(quiet ? null : Env.cur.err, throttle)
// kick off a thread to read data from the ffmpeg output
pngReaderThread := Synchronized(actorPool)
processRef := Unsafe(ffmpegProcess)
pngReaderThread.async |->| {
process := (Process2) processRef.val
inStream := process.stdOut
while (process.isAlive) {
pngBuf := readPng(inStream)
if (pngBuf != null) {
onNewPngImage(pngBuf.toImmutable)
pngBuf.clear.trim
pngBuf = null
}
}
}
log.info("Streaming video to PNG images")
}
override Void onVideoFrame(Buf payload, PaveHeader pave) {
ffmpegProcess.stdIn.writeBuf(payload).flush
}
override Void onDisconnect(Bool abnormal) {
ffmpegProcess.kill
}
** Luckily, the PNG format is easy to read and easy to find the end of an image.
static Buf? readPng(InStream in) {
pngMagicNum := 0x89_50_4E_47_0D_0A_1A_0A
try {
pngBuf := Buf() { endian = Endian.big }
bufMagicNum := in.readS8
pngBuf.writeI8(bufMagicNum)
if (bufMagicNum != pngMagicNum)
log.warn("Invalid PNG Magic Num: 0x${bufMagicNum}")
pngEnd := false
while (!pngEnd) {
chunkSize := in.readU4
chunkType := in.readChars(4)
pngBuf.writeI4(chunkSize)
pngBuf.writeChars(chunkType)
in.readBufFully(pngBuf, chunkSize + 4).seek(pngBuf.size) // 4 bytes = CRC
pngEnd = (chunkType == "IEND") // 0x "49 45 4e 44"
}
return pngBuf.flip
} catch (IOErr ioe) {
// meh - we usually get 'Unexpected end of stream' when then FFMPEG is killed
} catch (Err err) {
log.err("VideoStreamerToPngEvents - ${err.typeof} - ${err.msg}")
}
return null
}
}