diff --git a/EWS_POC.iml b/EWS_POC.iml
index ebc327c..34ef67c 100644
--- a/EWS_POC.iml
+++ b/EWS_POC.iml
@@ -12,7 +12,6 @@
-
diff --git a/src/Main.kt b/src/Main.kt
index 641859f..cf2673f 100644
--- a/src/Main.kt
+++ b/src/Main.kt
@@ -1,9 +1,56 @@
+import somecodes.Codes.Companion.ValidString
import zello.ZelloClient
+import zello.ZelloEvent
//TIP To Run code, press or
// click the icon in the gutter.
fun main() {
val z = ZelloClient.fromConsumerZello()
+ z.Start(object : ZelloEvent {
+ override fun onChannelStatus(
+ channel: String,
+ status: String,
+ userOnline: Int,
+ error: String?,
+ errorType: String?
+ ) {
+ println("Channel Status: $channel is $status with $userOnline users online.")
+ if (ValidString(error) && ValidString(errorType)) {
+ println("Error: $error, Type: $errorType")
+ }
+ }
+ override fun onAudioData(streamID: Int, from: String, For: String, channel: String, data: ByteArray) {
+ println("Audio Data received from $from for $For on channel $channel with streamID $streamID ")
+ }
+
+ override fun onThumbnailImage(imageID: Int, from: String, For: String, channel: String, data: ByteArray, timestamp: Long) {
+ println("Thumbnail Image received from $from for $For on channel $channel with imageID $imageID at timestamp $timestamp")
+ }
+
+ override fun onFullImage(imageID: Int, from: String, For: String, channel: String, data: ByteArray, timestamp: Long) {
+ println("Full Image received from $from for $For on channel $channel with imageID $imageID at timestamp $timestamp")
+ }
+
+ override fun onTextMessage(messageID: Int, from: String, For: String, channel: String, text: String, timestamp: Long) {
+ println("Text Message received from $from for $For on channel $channel with messageID $messageID at timestamp $timestamp. Text: $text")
+ }
+
+ override fun onLocation(messageID: Int, from: String, For: String, channel: String, latitude: Double, longitude: Double, address: String, accuracy: Double, timestamp: Long) {
+ println("Location received from $from for $For on channel $channel with messageID $messageID at timestamp $timestamp. Location($latitude,$longitude), Address:$address, Accuracy:$accuracy")
+ }
+
+ override fun onConnected() {
+ println("Connected to Zello server.")
+ }
+
+ override fun onDisconnected() {
+ println("Disconnected from Zello server.")
+ }
+
+ override fun onError(errorMessage: String) {
+ println("Error occurred in Zello client: $errorMessage")
+ }
+ })
}
\ No newline at end of file
diff --git a/src/somecodes/Codes.kt b/src/somecodes/Codes.kt
new file mode 100644
index 0000000..2040bc2
--- /dev/null
+++ b/src/somecodes/Codes.kt
@@ -0,0 +1,10 @@
+package somecodes
+
+class Codes {
+
+ companion object{
+ fun ValidString(s : String?) : Boolean {
+ return s != null && s.isNotEmpty() && s.isNotBlank()
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/zello/Event_OnStreamStop.kt b/src/zello/Event_OnStreamStop.kt
index fca4e86..c874054 100644
--- a/src/zello/Event_OnStreamStop.kt
+++ b/src/zello/Event_OnStreamStop.kt
@@ -5,6 +5,6 @@ import com.fasterxml.jackson.annotation.JsonPropertyOrder
@Suppress("unused")
@JsonPropertyOrder(value = ["command", "stream_id"])
class Event_OnStreamStop {
- var command: String = "" // must be "on_stream_stop
+ var command: String = "" // must be "on_stream_stop"
var stream_id: Int = 0
}
\ No newline at end of file
diff --git a/src/zello/ZelloAudioJob.kt b/src/zello/ZelloAudioJob.kt
new file mode 100644
index 0000000..38ea038
--- /dev/null
+++ b/src/zello/ZelloAudioJob.kt
@@ -0,0 +1,37 @@
+package zello
+
+import java.io.ByteArrayOutputStream
+
+/**
+ * Stream audio job for Zello client.
+ * @param: codec_header The codec header for the audio stream.
+ * @param: packet_duration The duration of each audio packet in milliseconds. Values between 2.5 and 60 ms are supported.
+ * @param: channel The channel to which the audio stream belongs.
+ * @param: streamID The unique identifier for the audio stream.
+ * @param: from The user who is sending the audio stream.
+ * @param: For Optional. The user for whom the audio stream is intended. If not specified, the audio is sent to the channel.
+ */
+@Suppress("unused")
+class ZelloAudioJob(codec_header: String, packet_duration: Int, val channel: String, val streamID: Int, val from: String, val For: String?=null) {
+ val type = "audio" // must be "audio" for audio jobs
+ val codec = "opus" // Default audio codec
+ private val baos = ByteArrayOutputStream()
+ fun pushAudioData(data: ByteArray){
+ baos.write(data)
+ }
+ fun getAudioData(): ByteArray {
+ return baos.toByteArray()
+ }
+ companion object {
+ fun fromEventOnStreamStart(x : Event_OnStreamStart) : ZelloAudioJob {
+ return ZelloAudioJob(
+ x.codec_header,
+ x.packet_duration,
+ x.channel,
+ x.stream_id,
+ x.from,
+ x.For
+ )
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/zello/ZelloClient.kt b/src/zello/ZelloClient.kt
index 0edb210..6117863 100644
--- a/src/zello/ZelloClient.kt
+++ b/src/zello/ZelloClient.kt
@@ -7,6 +7,7 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.lang.Exception
import java.net.URI
+import java.nio.ByteBuffer
@Suppress("unused")
/**
@@ -15,6 +16,9 @@ import java.net.URI
*
*/
class ZelloClient(val address : URI) {
+ private val streamJob = HashMap()
+ private val imageJob = HashMap()
+ private val commandJob = HashMap()
companion object {
fun fromConsumerZello() : ZelloClient {
return ZelloClient(URI.create("wss://zello.io/ws"))
@@ -30,32 +34,158 @@ class ZelloClient(val address : URI) {
}
private var client : WebSocketClient? = null
-
+ private var seqID : Int = 0
// this key is temporary, valid only 30 days from 2025-07-29
// if need to create, from https://developers.zello.com/keys
private val developerKey : String = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJpc3MiOiJXa002Y21ScllYSjBiMjV2T2pFLi1yYjJ2THFRbUhYV3dKY2I2azl2TDdUMEtzRWZMRjcxZm5jcktTZ0s2ZE0iLCJleHAiOjE3NTY0MzIyMTIsImF6cCI6ImRldiJ9.ANK7BIS6WVVWsQRjcZXyGWrV2RodCUQD4WXWaA6E4Dlyy8bBCMFdbiKN2D7B_x729HQULailnfRhbXF4Avfg14qONdc1XE_0iGiPUO1kfUSgdd11QylOzjxy6FTKSeZmHOh65JZq2dIWxobCcva-RPvbR8TA656upHh32xrWv9zlU0N707FTca04kze0Iq-q-uC5EL82yK10FEvOPDX88MYy71QRYi8Qh_KbSyMcYAhe2bTsiyjm51ZH9ntkRHd0HNiaijNZI6-qXkkp5Soqmzh-bTtbbgmbX4BT3Qpz_IP3epaX3jl_Aq5DHxXwCsJ9FThif9um5D0TWVGQteR0cQ"
+ // default channel to join
+ private var channels = arrayOf("GtcDev2025")
+ // refresh token for the session
+ // this is set after the first LogonReply
+ private var refresh_token: String? = null
+
private val logger : Logger = LoggerFactory.getLogger(ZelloClient::class.java)
- var WebsocketIsConnected : Boolean = false
private val mapper = jacksonObjectMapper()
- fun Start(){
+ fun Start(event: ZelloEvent){
client = object : WebSocketClient(address) {
override fun onOpen(handshakedata: ServerHandshake?) {
logger.info("Connected to $address")
- WebsocketIsConnected = true
-
+ seqID = 0
+ inc_seqID()
+ val lg = LogonCommand.create(seqID,channels, developerKey)
+ val value = mapper.writeValueAsString(lg)
+ logger.info("Sending LogonCommand: $value")
+ send(value)
}
- override fun onMessage(message: String?) {
- logger.info("Message received: $message")
- val jsnode = mapper.readTree(message)
- when (jsnode["command"]?.asText()){
+
+ override fun onMessage(bytes: ByteBuffer?) {
+ val receivedbytes = bytes?.capacity() ?: 0
+ logger.info("Binary message received, length: $receivedbytes")
+ if (receivedbytes>0){
+ when(bytes?.get(0)){
+ 0x01.toByte() -> {
+ // audio stream
+ // format {type(8) = 0x01, stream_id(32), packet_id(32), data[]}
+ val str_id = bytes.getInt(1)
+ val packet_id = bytes.getInt(5)
+ val data = ByteArray(receivedbytes - 9)
+ bytes.get(data, 9, receivedbytes - 9)
+ logger.info("Audio stream received, stream_id=$str_id, packet_id=$packet_id, data length=${data.size}")
+
+ streamJob[str_id]?.pushAudioData(data)
+ }
+ 0x02.toByte() ->{
+ // image stream
+ // tumbnail packet : format {type(8) = 0x02, image_id(32), image_type(32) = 0x02, data[]}
+ // full image packet : format {type(8) = 0x02, image_id(32), image_type(32) = 0x01, data[]}
+ val image_id = bytes.getInt(1)
+ val image_type = bytes.getInt(5)
+ val data = ByteArray(receivedbytes - 9)
+ bytes.get(data, 9, receivedbytes - 9)
+ logger.info("Image stream received, image_id=$image_id, image_type=$image_type, data length=${data.size}")
+ // Here you can process the image data
+ // image_type 0x01 is full image, 0x02 is thumbnail
+ when (image_type) {
+ 0x01 -> {
+ // full image
+ imageJob[image_id]?.pushFull(data)
+ }
+ 0x02 -> {
+ // thumbnail image
+ imageJob[image_id]?.pushThumbnail(data)
+ }
+ else -> {
+ logger.warn("Unknown image type: $image_type")
+ }
+ }
+ }
+ else -> {
+ logger.warn("Unknown binary message type: ${bytes?.get(0)}")
+ }
+ }
}
}
+
+ override fun onMessage(message: String?) {
+ logger.info("Message received: $message")
+ val jsnode = mapper.readTree(message)
+ if (jsnode["seq"] != null) {
+ val seq = jsnode["seq"].asInt()
+ if (seq == 1){
+ //logon reply
+ val lgreply = mapper.treeToValue(jsnode, LogonReply::class.java)
+ if (lgreply.success==true){
+ logger.info("successfully logged on, refresh_token=${lgreply.refresh_token}")
+ refresh_token = lgreply.refresh_token
+ event.onConnected()
+ } else {
+ logger.error("Failed to logon: ${lgreply.error ?: "Unknown error"}")
+ }
+ } else {
+ // other commands
+ when (jsnode["command"]?.asText()){
+ "on_channel_status" -> {
+ val channelstatus = mapper.treeToValue(jsnode, Event_OnChannelStatus::class.java)
+ event.onChannelStatus(channelstatus.channel, channelstatus.status, channelstatus.users_online, channelstatus.error, channelstatus.error_type)
+ }
+ "on_error" -> {
+ val error = mapper.treeToValue(jsnode, Event_OnError::class.java)
+ event.onError(error.error)
+ }
+ "on_stream_start" ->{
+ val streamstart = mapper.treeToValue(jsnode, Event_OnStreamStart::class.java)
+ logger.info("Stream started on channel ${streamstart.channel} from ${streamstart.from} for ${streamstart.For}")
+ streamJob.put(streamstart.stream_id, ZelloAudioJob.fromEventOnStreamStart(streamstart))
+ }
+ "on_stream_stop" -> {
+ val streamstop = mapper.treeToValue(jsnode, Event_OnStreamStop::class.java)
+ logger.info("Stream stopped on channel ${streamstop.stream_id}")
+ val job = streamJob.remove(streamstop.stream_id)
+ if (job!=null){
+ event.onAudioData(job.streamID, job.from, job.For?:"", job.channel, job.getAudioData())
+ }
+ }
+ "on_image" ->{
+ val image = mapper.treeToValue(jsnode, Event_OnImage::class.java)
+ logger.info("Image received, image_id=${image.message_id}, from=${image.from}, For=${image.For}, channel=${image.channel}")
+ imageJob.put(image.message_id, ZelloImageJob(image.channel,image.message_id,image.height,image.width,image.source,image.from,image.For))
+ }
+ "on_text_message" ->{
+ val textmessage = mapper.treeToValue(jsnode, Event_OnTextMessage::class.java)
+ event.onTextMessage(textmessage.message_id, textmessage.from, textmessage.For, textmessage.channel, textmessage.text, System.currentTimeMillis())
+ }
+ "on_location" -> {
+ val location = mapper.treeToValue(jsnode, Event_OnLocation::class.java)
+ event.onLocation(location.message_id, location.from, location.For, location.channel, location.latitude, location.longitude, location.formatted_address, location.accuracy, System.currentTimeMillis())
+ }
+ else ->{
+ // handle other commands
+ logger.warn("Unknown command: ${jsnode["command"]?.asText()}")
+ }
+ }
+ }
+ }
+
+ }
+
override fun onClose(code: Int, reason: String?, remote: Boolean) {
logger.info("Closed from ${if (remote) "Remote side" else "Local side"}, Code=$code, Reason=$reason")
- WebsocketIsConnected = false
+ event.onDisconnected()
+ // try reconnecting after 10 seconds
+ val thread = Thread {
+ try {
+ Thread.sleep(10000)
+ connect()
+ } catch (e: InterruptedException) {
+ logger.error("Reconnection interrupted: ${e.message}")
+ }
+ }
+ thread.name= "ZelloClient-ReconnectThread"
+ thread.isDaemon = true
+ thread.start()
}
override fun onError(ex: Exception?) {
@@ -69,7 +199,14 @@ class ZelloClient(val address : URI) {
client?.close()
}
-
+ private fun inc_seqID(): Int{
+ if (seqID < Int.MAX_VALUE) {
+ seqID++
+ } else {
+ seqID = 0
+ }
+ return seqID
+ }
diff --git a/src/zello/ZelloEvent.kt b/src/zello/ZelloEvent.kt
new file mode 100644
index 0000000..0980751
--- /dev/null
+++ b/src/zello/ZelloEvent.kt
@@ -0,0 +1,14 @@
+package zello
+
+@Suppress("unused")
+interface ZelloEvent {
+ fun onChannelStatus(channel: String, status: String, userOnline: Int, error: String? = null, errorType: String? = null)
+ fun onAudioData(streamID: Int, from:String, For: String, channel: String, data: ByteArray)
+ fun onThumbnailImage(imageID: Int, from:String, For: String, channel: String, data: ByteArray, timestamp: Long)
+ fun onFullImage(imageID: Int,from: String, For: String, channel: String, data: ByteArray, timestamp: Long)
+ fun onTextMessage(messageID: Int, from: String, For: String, channel: String, text: String, timestamp: Long)
+ fun onLocation(messageID: Int, from: String, For: String, channel: String, latitude: Double, longitude: Double, address:String, accuracy: Double, timestamp: Long )
+ fun onConnected()
+ fun onDisconnected()
+ fun onError(errorMessage: String)
+}
\ No newline at end of file
diff --git a/src/zello/ZelloImageJob.kt b/src/zello/ZelloImageJob.kt
new file mode 100644
index 0000000..981b2b8
--- /dev/null
+++ b/src/zello/ZelloImageJob.kt
@@ -0,0 +1,32 @@
+package zello
+
+import java.io.ByteArrayOutputStream
+
+/**
+ * Zello Image Receiving Job
+ * @param channel The channel name where the image is received
+ * @param messageID The unique identifier for the image message
+ * @param height The height of the image (some clients dont provide this value)
+ * @param width The width of the image (some clients dont provide this value)
+ * @param source The source of the image ("camera" or "library")
+ * @param from The sender of the image
+ * @param For The recipient of the image (optional, if it was sent with "for" parameter)
+ */
+@Suppress("unused")
+class ZelloImageJob(val channel: String, val messageID: Int, val height: Int, val width: Int, val source:String, val from: String, val For: String?=null) {
+ val type = "jpeg" // Default image type
+ private val thumbnailbaos = ByteArrayOutputStream()
+ private val fullbaos = ByteArrayOutputStream()
+ fun pushThumbnail(data: ByteArray) {
+ thumbnailbaos.write(data)
+ }
+ fun pushFull(data: ByteArray) {
+ fullbaos.write(data)
+ }
+ fun getThumbnail(): ByteArray {
+ return thumbnailbaos.toByteArray()
+ }
+ fun getFull(): ByteArray {
+ return fullbaos.toByteArray()
+ }
+}
\ No newline at end of file