using concurrent::Actor
using concurrent::ActorPool
using concurrent::AtomicRef
using afConcurrent::SynchronizedList
using afConcurrent::SynchronizedState
using inet::TcpSocket
internal const class VideoReader {
private const Log log := Drone#.pod.log
private const Drone drone
private const ActorPool actorPool
private const AtomicRef frameListenerRef := AtomicRef()
private const AtomicRef errorListenerRef := AtomicRef()
private const SynchronizedState mutex
private const Int port
new make(Drone drone, ActorPool actorPool, Int port) {
this.drone = drone
this.actorPool = actorPool
this.port = port
this.mutex = SynchronizedState(actorPool) |->Obj?| {
VideoReaderImpl(drone.networkConfig, port)
}
}
** Only internal listeners should be added here.
Void setFrameListener(|Buf, PaveHeader| f) {
frameListenerRef.val = f
}
Void setErrorListener(|Err| f) {
errorListenerRef.val = f
}
Void connect() {
mutex.getState |VideoReaderImpl reader| {
reader.connect
if (reader.isConnected && !actorPool.isStopped)
readVidData
}
}
Bool isConnected() {
mutex.getState |VideoReaderImpl reader->Bool| {
reader.isConnected
}
}
Void disconnect() {
if (!mutex.lock.actor.pool.isStopped)
try mutex.getState |VideoReaderImpl reader| {
reader.disconnect
}
catch { /* meh */ }
frameListenerRef.val = null
errorListenerRef.val = null
}
private Void readVidData() {
mutex.withState |VideoReaderImpl reader| {
doReadVidData(reader)
if (reader.isConnected && !actorPool.isStopped)
readVidData
}
}
private Void doReadVidData(VideoReaderImpl reader) {
pave := null as PaveHeader
try pave = reader.receive
catch (IOErr err)
// drone.isConnected is set to false *before* we stop the ActorPool
if (reader.isConnected && drone.isConnected)
((|Err|?) errorListenerRef.val)?.call(err)
catch (Err err)
((|Err|?) errorListenerRef.val)?.call(err)
// call internal listeners
if (pave != null)
((|Buf, PaveHeader|?) frameListenerRef.val)?.call(pave.payload, pave)
pave = null
}
}
internal class VideoReaderImpl {
const Log log := Drone#.pod.log
const Int port
const NetworkConfig config
TcpSocket? socket
new make(NetworkConfig config, Int port) {
this.config = config
this.port = port
}
Void connect() {
if (isConnected)
disconnect
this.socket = TcpSocket {
it.options.receiveTimeout = config.tcpReceiveTimeout
}.connect(config.droneIpAddr, port, config.actionTimeout)
}
Bool isConnected() {
socket != null && socket.isConnected
}
Void disconnect() {
socket?.close
socket = null
}
PaveHeader? receive() {
if (socket == null) return null
in := socket.in { endian = Endian.little }
// may need to wait until we have all the header data -> readBufFully() ??
return PaveHeader {
signature = in.readChars(4)
version = uint8(in)
videoCodec = uint8(in)
headerSize = uint16(in)
payloadSize = uint32(in)
encodedStreamWidth = uint16(in)
encodedStreamHeight = uint16(in)
displayWidth = uint16(in)
displayHeight = uint16(in)
frameNumber = uint32(in)
timestamp = 1ms * uint32(in)
totalChunks = uint8(in)
chunkIndex = uint8(in)
frameType = uint8(in)
control = uint8(in)
streamBytePosition = uint32(in) + (uint32(in).shiftl(32))
streamId = uint16(in)
totalSlices = uint8(in)
sliceIndex = uint8(in)
header1Size = uint8(in)
header2Size = uint8(in)
in.skip(2)
advertisedSize = uint32(in)
in.skip(12)
if (signature != "PaVE")
// meh - lets carry on regardless
log.warn("Invalid PaVE signature: ${signature}")
// stupid kludge for https://projects.ardrone.org/issues/show/159
in.skip(headerSize - 64)
payload = in.readBufFully(null, payloadSize)
}
}
private static Bool bool (InStream in) { in.readU1 != 0 }
private static Int uint8 (InStream in) { in.readU1 }
private static Int uint16 (InStream in) { in.readU2 }
private static Int uint32 (InStream in) { in.readU4 }
private static Int int16 (InStream in) { in.readS2 }
private static Int int32 (InStream in) { in.readS4 }
private static Int int64 (InStream in) { in.readS8 }
private static Float float32 (InStream in) { Float.makeBits32(in.readU4) }
private static Float double64(InStream in) { Float.makeBits(in.readS8) }
}
** Parrot Video Encapsulation (PaVE) headers for video frame data.
** Passed to the [drone.onVideoFrame()]`Drone.onVideoFrame` event hook.
const class PaveHeader {
** "PaVE" - used to identify the start of frame
const Str signature
** Version code
const Int version
** Codec of the following frame
const Int videoCodec
** Size of the parrot_video_encapsulation_t
const Int headerSize
** Amount of data following this PaVE
const Int payloadSize
** Example: 640
const Int encodedStreamWidth
** Example: 368
const Int encodedStreamHeight
** Example: 640
const Int displayWidth
** Example: 360
const Int displayHeight
** Frame position inside the current stream
const Int frameNumber
** In milliseconds
const Duration timestamp
** Number of UDP packets containing the current decodable payload - currently unused
const Int totalChunks
** Position of the packet - first chunk is #0 - currenty unused
const Int chunkIndex
** I-frame, P-frame - parrot_video_encapsulation_frametypes_t
const Int frameType
** Special commands like end-of-stream or advertised frames
const Int control
** Byte position of the current payload in the encoded stream
const Int streamBytePosition
** This ID indentifies packets that should be recorded together
const Int streamId
** Number of slices composing the current frame
const Int totalSlices
** Position of the current slice in the frame
const Int sliceIndex
** H.264 only : size of SPS inside payload - no SPS present if value is zero
const Int header1Size
** H.264 only : size of PPS inside payload - no PPS present if value is zero
const Int header2Size
** Size of frames announced as advertised frames
const Int advertisedSize
** The raw video frame data
const Buf payload
@NoDoc
new make(|This| f) { f(this) }
}