From 126172aa734c3fc94119cde0e0d2302b5557e69e Mon Sep 17 00:00:00 2001 From: rdkartono Date: Thu, 31 Jul 2025 11:43:38 +0700 Subject: [PATCH] Commit 31/07/2025 --- EWS_POC.iml | 1 - src/Main.kt | 47 ++++++++++ src/somecodes/Codes.kt | 10 ++ src/zello/Event_OnStreamStop.kt | 2 +- src/zello/ZelloAudioJob.kt | 37 ++++++++ src/zello/ZelloClient.kt | 159 +++++++++++++++++++++++++++++--- src/zello/ZelloEvent.kt | 14 +++ src/zello/ZelloImageJob.kt | 32 +++++++ 8 files changed, 289 insertions(+), 13 deletions(-) create mode 100644 src/somecodes/Codes.kt create mode 100644 src/zello/ZelloAudioJob.kt create mode 100644 src/zello/ZelloEvent.kt create mode 100644 src/zello/ZelloImageJob.kt diff --git a/EWS_POC.iml b/EWS_POC.iml index ebc327c..34ef67c 100644 --- a/EWS_POC.iml +++ b/EWS_POC.iml @@ -12,7 +12,6 @@ - diff --git a/src/Main.kt b/src/Main.kt index 641859f..cf2673f 100644 --- a/src/Main.kt +++ b/src/Main.kt @@ -1,9 +1,56 @@ +import somecodes.Codes.Companion.ValidString import zello.ZelloClient +import zello.ZelloEvent //TIP To Run code, press or // click the icon in the gutter. fun main() { val z = ZelloClient.fromConsumerZello() + z.Start(object : ZelloEvent { + override fun onChannelStatus( + channel: String, + status: String, + userOnline: Int, + error: String?, + errorType: String? + ) { + println("Channel Status: $channel is $status with $userOnline users online.") + if (ValidString(error) && ValidString(errorType)) { + println("Error: $error, Type: $errorType") + } + } + override fun onAudioData(streamID: Int, from: String, For: String, channel: String, data: ByteArray) { + println("Audio Data received from $from for $For on channel $channel with streamID $streamID ") + } + + override fun onThumbnailImage(imageID: Int, from: String, For: String, channel: String, data: ByteArray, timestamp: Long) { + println("Thumbnail Image received from $from for $For on channel $channel with imageID $imageID at timestamp $timestamp") + } + + override fun onFullImage(imageID: Int, from: String, For: String, channel: String, data: ByteArray, timestamp: Long) { + println("Full Image received from $from for $For on channel $channel with imageID $imageID at timestamp $timestamp") + } + + override fun onTextMessage(messageID: Int, from: String, For: String, channel: String, text: String, timestamp: Long) { + println("Text Message received from $from for $For on channel $channel with messageID $messageID at timestamp $timestamp. Text: $text") + } + + override fun onLocation(messageID: Int, from: String, For: String, channel: String, latitude: Double, longitude: Double, address: String, accuracy: Double, timestamp: Long) { + println("Location received from $from for $For on channel $channel with messageID $messageID at timestamp $timestamp. Location($latitude,$longitude), Address:$address, Accuracy:$accuracy") + } + + override fun onConnected() { + println("Connected to Zello server.") + } + + override fun onDisconnected() { + println("Disconnected from Zello server.") + } + + override fun onError(errorMessage: String) { + println("Error occurred in Zello client: $errorMessage") + } + }) } \ No newline at end of file diff --git a/src/somecodes/Codes.kt b/src/somecodes/Codes.kt new file mode 100644 index 0000000..2040bc2 --- /dev/null +++ b/src/somecodes/Codes.kt @@ -0,0 +1,10 @@ +package somecodes + +class Codes { + + companion object{ + fun ValidString(s : String?) : Boolean { + return s != null && s.isNotEmpty() && s.isNotBlank() + } + } +} \ No newline at end of file diff --git a/src/zello/Event_OnStreamStop.kt b/src/zello/Event_OnStreamStop.kt index fca4e86..c874054 100644 --- a/src/zello/Event_OnStreamStop.kt +++ b/src/zello/Event_OnStreamStop.kt @@ -5,6 +5,6 @@ import com.fasterxml.jackson.annotation.JsonPropertyOrder @Suppress("unused") @JsonPropertyOrder(value = ["command", "stream_id"]) class Event_OnStreamStop { - var command: String = "" // must be "on_stream_stop + var command: String = "" // must be "on_stream_stop" var stream_id: Int = 0 } \ No newline at end of file diff --git a/src/zello/ZelloAudioJob.kt b/src/zello/ZelloAudioJob.kt new file mode 100644 index 0000000..38ea038 --- /dev/null +++ b/src/zello/ZelloAudioJob.kt @@ -0,0 +1,37 @@ +package zello + +import java.io.ByteArrayOutputStream + +/** + * Stream audio job for Zello client. + * @param: codec_header The codec header for the audio stream. + * @param: packet_duration The duration of each audio packet in milliseconds. Values between 2.5 and 60 ms are supported. + * @param: channel The channel to which the audio stream belongs. + * @param: streamID The unique identifier for the audio stream. + * @param: from The user who is sending the audio stream. + * @param: For Optional. The user for whom the audio stream is intended. If not specified, the audio is sent to the channel. + */ +@Suppress("unused") +class ZelloAudioJob(codec_header: String, packet_duration: Int, val channel: String, val streamID: Int, val from: String, val For: String?=null) { + val type = "audio" // must be "audio" for audio jobs + val codec = "opus" // Default audio codec + private val baos = ByteArrayOutputStream() + fun pushAudioData(data: ByteArray){ + baos.write(data) + } + fun getAudioData(): ByteArray { + return baos.toByteArray() + } + companion object { + fun fromEventOnStreamStart(x : Event_OnStreamStart) : ZelloAudioJob { + return ZelloAudioJob( + x.codec_header, + x.packet_duration, + x.channel, + x.stream_id, + x.from, + x.For + ) + } + } +} \ No newline at end of file diff --git a/src/zello/ZelloClient.kt b/src/zello/ZelloClient.kt index 0edb210..6117863 100644 --- a/src/zello/ZelloClient.kt +++ b/src/zello/ZelloClient.kt @@ -7,6 +7,7 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import java.lang.Exception import java.net.URI +import java.nio.ByteBuffer @Suppress("unused") /** @@ -15,6 +16,9 @@ import java.net.URI * */ class ZelloClient(val address : URI) { + private val streamJob = HashMap() + private val imageJob = HashMap() + private val commandJob = HashMap() companion object { fun fromConsumerZello() : ZelloClient { return ZelloClient(URI.create("wss://zello.io/ws")) @@ -30,32 +34,158 @@ class ZelloClient(val address : URI) { } private var client : WebSocketClient? = null - + private var seqID : Int = 0 // this key is temporary, valid only 30 days from 2025-07-29 // if need to create, from https://developers.zello.com/keys private val developerKey : String = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJpc3MiOiJXa002Y21ScllYSjBiMjV2T2pFLi1yYjJ2THFRbUhYV3dKY2I2azl2TDdUMEtzRWZMRjcxZm5jcktTZ0s2ZE0iLCJleHAiOjE3NTY0MzIyMTIsImF6cCI6ImRldiJ9.ANK7BIS6WVVWsQRjcZXyGWrV2RodCUQD4WXWaA6E4Dlyy8bBCMFdbiKN2D7B_x729HQULailnfRhbXF4Avfg14qONdc1XE_0iGiPUO1kfUSgdd11QylOzjxy6FTKSeZmHOh65JZq2dIWxobCcva-RPvbR8TA656upHh32xrWv9zlU0N707FTca04kze0Iq-q-uC5EL82yK10FEvOPDX88MYy71QRYi8Qh_KbSyMcYAhe2bTsiyjm51ZH9ntkRHd0HNiaijNZI6-qXkkp5Soqmzh-bTtbbgmbX4BT3Qpz_IP3epaX3jl_Aq5DHxXwCsJ9FThif9um5D0TWVGQteR0cQ" + // default channel to join + private var channels = arrayOf("GtcDev2025") + // refresh token for the session + // this is set after the first LogonReply + private var refresh_token: String? = null + private val logger : Logger = LoggerFactory.getLogger(ZelloClient::class.java) - var WebsocketIsConnected : Boolean = false private val mapper = jacksonObjectMapper() - fun Start(){ + fun Start(event: ZelloEvent){ client = object : WebSocketClient(address) { override fun onOpen(handshakedata: ServerHandshake?) { logger.info("Connected to $address") - WebsocketIsConnected = true - + seqID = 0 + inc_seqID() + val lg = LogonCommand.create(seqID,channels, developerKey) + val value = mapper.writeValueAsString(lg) + logger.info("Sending LogonCommand: $value") + send(value) } - override fun onMessage(message: String?) { - logger.info("Message received: $message") - val jsnode = mapper.readTree(message) - when (jsnode["command"]?.asText()){ + + override fun onMessage(bytes: ByteBuffer?) { + val receivedbytes = bytes?.capacity() ?: 0 + logger.info("Binary message received, length: $receivedbytes") + if (receivedbytes>0){ + when(bytes?.get(0)){ + 0x01.toByte() -> { + // audio stream + // format {type(8) = 0x01, stream_id(32), packet_id(32), data[]} + val str_id = bytes.getInt(1) + val packet_id = bytes.getInt(5) + val data = ByteArray(receivedbytes - 9) + bytes.get(data, 9, receivedbytes - 9) + logger.info("Audio stream received, stream_id=$str_id, packet_id=$packet_id, data length=${data.size}") + + streamJob[str_id]?.pushAudioData(data) + } + 0x02.toByte() ->{ + // image stream + // tumbnail packet : format {type(8) = 0x02, image_id(32), image_type(32) = 0x02, data[]} + // full image packet : format {type(8) = 0x02, image_id(32), image_type(32) = 0x01, data[]} + val image_id = bytes.getInt(1) + val image_type = bytes.getInt(5) + val data = ByteArray(receivedbytes - 9) + bytes.get(data, 9, receivedbytes - 9) + logger.info("Image stream received, image_id=$image_id, image_type=$image_type, data length=${data.size}") + // Here you can process the image data + // image_type 0x01 is full image, 0x02 is thumbnail + when (image_type) { + 0x01 -> { + // full image + imageJob[image_id]?.pushFull(data) + } + 0x02 -> { + // thumbnail image + imageJob[image_id]?.pushThumbnail(data) + } + else -> { + logger.warn("Unknown image type: $image_type") + } + } + } + else -> { + logger.warn("Unknown binary message type: ${bytes?.get(0)}") + } + } } } + + override fun onMessage(message: String?) { + logger.info("Message received: $message") + val jsnode = mapper.readTree(message) + if (jsnode["seq"] != null) { + val seq = jsnode["seq"].asInt() + if (seq == 1){ + //logon reply + val lgreply = mapper.treeToValue(jsnode, LogonReply::class.java) + if (lgreply.success==true){ + logger.info("successfully logged on, refresh_token=${lgreply.refresh_token}") + refresh_token = lgreply.refresh_token + event.onConnected() + } else { + logger.error("Failed to logon: ${lgreply.error ?: "Unknown error"}") + } + } else { + // other commands + when (jsnode["command"]?.asText()){ + "on_channel_status" -> { + val channelstatus = mapper.treeToValue(jsnode, Event_OnChannelStatus::class.java) + event.onChannelStatus(channelstatus.channel, channelstatus.status, channelstatus.users_online, channelstatus.error, channelstatus.error_type) + } + "on_error" -> { + val error = mapper.treeToValue(jsnode, Event_OnError::class.java) + event.onError(error.error) + } + "on_stream_start" ->{ + val streamstart = mapper.treeToValue(jsnode, Event_OnStreamStart::class.java) + logger.info("Stream started on channel ${streamstart.channel} from ${streamstart.from} for ${streamstart.For}") + streamJob.put(streamstart.stream_id, ZelloAudioJob.fromEventOnStreamStart(streamstart)) + } + "on_stream_stop" -> { + val streamstop = mapper.treeToValue(jsnode, Event_OnStreamStop::class.java) + logger.info("Stream stopped on channel ${streamstop.stream_id}") + val job = streamJob.remove(streamstop.stream_id) + if (job!=null){ + event.onAudioData(job.streamID, job.from, job.For?:"", job.channel, job.getAudioData()) + } + } + "on_image" ->{ + val image = mapper.treeToValue(jsnode, Event_OnImage::class.java) + logger.info("Image received, image_id=${image.message_id}, from=${image.from}, For=${image.For}, channel=${image.channel}") + imageJob.put(image.message_id, ZelloImageJob(image.channel,image.message_id,image.height,image.width,image.source,image.from,image.For)) + } + "on_text_message" ->{ + val textmessage = mapper.treeToValue(jsnode, Event_OnTextMessage::class.java) + event.onTextMessage(textmessage.message_id, textmessage.from, textmessage.For, textmessage.channel, textmessage.text, System.currentTimeMillis()) + } + "on_location" -> { + val location = mapper.treeToValue(jsnode, Event_OnLocation::class.java) + event.onLocation(location.message_id, location.from, location.For, location.channel, location.latitude, location.longitude, location.formatted_address, location.accuracy, System.currentTimeMillis()) + } + else ->{ + // handle other commands + logger.warn("Unknown command: ${jsnode["command"]?.asText()}") + } + } + } + } + + } + override fun onClose(code: Int, reason: String?, remote: Boolean) { logger.info("Closed from ${if (remote) "Remote side" else "Local side"}, Code=$code, Reason=$reason") - WebsocketIsConnected = false + event.onDisconnected() + // try reconnecting after 10 seconds + val thread = Thread { + try { + Thread.sleep(10000) + connect() + } catch (e: InterruptedException) { + logger.error("Reconnection interrupted: ${e.message}") + } + } + thread.name= "ZelloClient-ReconnectThread" + thread.isDaemon = true + thread.start() } override fun onError(ex: Exception?) { @@ -69,7 +199,14 @@ class ZelloClient(val address : URI) { client?.close() } - + private fun inc_seqID(): Int{ + if (seqID < Int.MAX_VALUE) { + seqID++ + } else { + seqID = 0 + } + return seqID + } diff --git a/src/zello/ZelloEvent.kt b/src/zello/ZelloEvent.kt new file mode 100644 index 0000000..0980751 --- /dev/null +++ b/src/zello/ZelloEvent.kt @@ -0,0 +1,14 @@ +package zello + +@Suppress("unused") +interface ZelloEvent { + fun onChannelStatus(channel: String, status: String, userOnline: Int, error: String? = null, errorType: String? = null) + fun onAudioData(streamID: Int, from:String, For: String, channel: String, data: ByteArray) + fun onThumbnailImage(imageID: Int, from:String, For: String, channel: String, data: ByteArray, timestamp: Long) + fun onFullImage(imageID: Int,from: String, For: String, channel: String, data: ByteArray, timestamp: Long) + fun onTextMessage(messageID: Int, from: String, For: String, channel: String, text: String, timestamp: Long) + fun onLocation(messageID: Int, from: String, For: String, channel: String, latitude: Double, longitude: Double, address:String, accuracy: Double, timestamp: Long ) + fun onConnected() + fun onDisconnected() + fun onError(errorMessage: String) +} \ No newline at end of file diff --git a/src/zello/ZelloImageJob.kt b/src/zello/ZelloImageJob.kt new file mode 100644 index 0000000..981b2b8 --- /dev/null +++ b/src/zello/ZelloImageJob.kt @@ -0,0 +1,32 @@ +package zello + +import java.io.ByteArrayOutputStream + +/** + * Zello Image Receiving Job + * @param channel The channel name where the image is received + * @param messageID The unique identifier for the image message + * @param height The height of the image (some clients dont provide this value) + * @param width The width of the image (some clients dont provide this value) + * @param source The source of the image ("camera" or "library") + * @param from The sender of the image + * @param For The recipient of the image (optional, if it was sent with "for" parameter) + */ +@Suppress("unused") +class ZelloImageJob(val channel: String, val messageID: Int, val height: Int, val width: Int, val source:String, val from: String, val For: String?=null) { + val type = "jpeg" // Default image type + private val thumbnailbaos = ByteArrayOutputStream() + private val fullbaos = ByteArrayOutputStream() + fun pushThumbnail(data: ByteArray) { + thumbnailbaos.write(data) + } + fun pushFull(data: ByteArray) { + fullbaos.write(data) + } + fun getThumbnail(): ByteArray { + return thumbnailbaos.toByteArray() + } + fun getFull(): ByteArray { + return fullbaos.toByteArray() + } +} \ No newline at end of file