diff --git a/src/Main.kt b/src/Main.kt index 02f618a..3eda42f 100644 --- a/src/Main.kt +++ b/src/Main.kt @@ -1,3 +1,5 @@ +import audio.AudioUtility +import audio.OpusStreamReceiver import somecodes.Codes.Companion.ValidString import zello.ZelloClient import zello.ZelloEvent @@ -5,8 +7,23 @@ import zello.ZelloEvent //TIP To Run code, press or // click the icon in the gutter. fun main() { + val au = AudioUtility() + var audioID = 0 + val preferedAudioDevice = "Speakers" + au.DetectPlaybackDevices().forEach { pair -> + println("Device ID: ${pair.first}, Name: ${pair.second}") + if (pair.second.contains(preferedAudioDevice)) { + audioID = pair.first + } + } + if (audioID!=0){ + val initsuccess = au.InitDevice(audioID) + println("Audio Device $audioID initialized: $initsuccess") + } - val z = ZelloClient.fromConsumerZello() + val o = OpusStreamReceiver(audioID) + + val z = ZelloClient.fromConsumerZello("gtcdevice01","GtcDev2025") z.Start(object : ZelloEvent { override fun onChannelStatus( channel: String, @@ -52,6 +69,28 @@ fun main() { override fun onError(errorMessage: String) { println("Error occurred in Zello client: $errorMessage") } + + override fun onStartStreaming(from: String, For: String, channel: String) { + if (o.Start()){ + println("Opus Receiver ready for streaming from $from for $For on channel $channel") + } else { + println("Failed to start Opus Receiver for streaming from $from for $For on channel $channel") + } + } + + override fun onStopStreaming(from: String, For: String, channel: String) { + o.Stop() + println("Opus Receiver stopped streaming from $from for $For on channel $channel") + } + + override fun onStreamingData( + from: String, + For: String, + channel: String, + data: ByteArray + ) { + if (o.isPlaying) o.PushData(data) + } }) } \ No newline at end of file diff --git a/src/audio/AudioFilePlayer.kt b/src/audio/AudioFilePlayer.kt new file mode 100644 index 0000000..699828f --- /dev/null +++ b/src/audio/AudioFilePlayer.kt @@ -0,0 +1,42 @@ +package audio + +import java.util.function.Consumer + +/** + * Audio Player for playing audio files. + * Supported extensions : .wav, .mp3 + */ +@Suppress("unused") +class AudioFilePlayer(deviceID: Int, val filename: String, device_samplingrate: Int = 48000) { + val bass: Bass = Bass.Instance + var filehandle = 0 + init{ + if (bass.BASS_SetDevice(deviceID)){ + filehandle = bass.BASS_StreamCreateFile(false, filename, 0, 0, 0) + if (filehandle == 0) { + throw Exception("Failed to create stream for file $filename: ${bass.BASS_ErrorGetCode()}") + } + } else throw Exception("Failed to set device $deviceID") + } + + fun Play(finished: Consumer ) : Boolean{ + if (bass.BASS_ChannelPlay(filehandle, false)){ + + val thread = Thread{ + while(true){ + Thread.sleep(1000) + if (bass.BASS_ChannelIsActive(filehandle)!= Bass.BASS_ACTIVE_PLAYING){ + // finished playing + break + } + } + finished.accept(true) + } + thread.name = "AudioFilePlayer $filename" + thread.isDaemon = true + thread.start() + return true + } + return false + } +} \ No newline at end of file diff --git a/src/audio/AudioUtility.kt b/src/audio/AudioUtility.kt new file mode 100644 index 0000000..af789e8 --- /dev/null +++ b/src/audio/AudioUtility.kt @@ -0,0 +1,41 @@ +package audio +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +class AudioUtility { + private val bass = Bass.Instance + private val logger : Logger = LoggerFactory.getLogger(AudioUtility::class.java) + + init{ + logger.info("Bass Version = ${bass.BASS_GetVersion().toHexString()}") + } + + fun DetectPlaybackDevices() : List> { + val result = ArrayList>() + for(i in 0..10){ + val dev = Bass.BASS_DEVICEINFO() + if (bass.BASS_GetDeviceInfo(i, dev)){ + if (dev.flags and Bass.BASS_DEVICE_ENABLED != 0){ + result.add(Pair(i, dev.name)) + } + } + } + return result + } + + fun InitDevice(deviceID: Int, device_samplingrate: Int = 48000) : Boolean { + val dev = Bass.BASS_DEVICEINFO() + if (bass.BASS_GetDeviceInfo(deviceID, dev)) { + if (dev.flags and Bass.BASS_DEVICE_ENABLED > 0){ + if (dev.flags and Bass.BASS_DEVICE_INIT > 0){ + return true // sudah init + } else { + val initflag = Bass.BASS_DEVICE_16BITS or Bass.BASS_DEVICE_MONO + return bass.BASS_Init(deviceID, device_samplingrate,initflag) + } + } + } + return false // gagal GetDeviceInfo + } + +} \ No newline at end of file diff --git a/src/audio/BASSOPUS.java b/src/audio/BASSOPUS.java index ac3e834..97e7577 100644 --- a/src/audio/BASSOPUS.java +++ b/src/audio/BASSOPUS.java @@ -18,18 +18,20 @@ public interface BASSOPUS extends Library { @Structure.FieldOrder({ "version", "channels", "preskip", "inputrate", "gain", "mapping", "streams", "coupled", "chanmap" }) class BASS_OPUS_HEAD extends Structure{ - byte version; - byte channels; - short preskip; - int inputrate; - short gain; - byte mapping; - byte streams; - byte coupled; - byte[] chanmap = new byte[255]; + public byte version; + public byte channels; + public short preskip; + public int inputrate; + public short gain; + public byte mapping; + public byte streams; + public byte coupled; + public byte[] chanmap = new byte[255]; } - int BASS_OPUS_StreamCreate(BASS_OPUS_HEAD head, int flags, Bass.STREAMPROC proc, Pointer user);int BASS_OPUS_StreamCreateFile(boolean mem, String file, long offset, long length, int flags); + int BASS_OPUS_StreamCreate(BASS_OPUS_HEAD head, int flags, Bass.STREAMPROC proc, Pointer user); + int BASS_OPUS_StreamCreate(BASS_OPUS_HEAD head, int flags,Pointer proc, Pointer user); + int BASS_OPUS_StreamCreateFile(boolean mem, String file, long offset, long length, int flags); int BASS_OPUS_StreamCreateURL(String url, int offset, int flags, Bass.DOWNLOADPROC proc, Pointer user); int BASS_OPUS_StreamCreateFileUser(int system, int flags, Bass.BASS_FILEPROCS procs, Pointer user); int BASS_OPUS_StreamPutData(int handle, Pointer buffer, int length); diff --git a/src/audio/Bass.java b/src/audio/Bass.java index c552e71..761e78c 100644 --- a/src/audio/Bass.java +++ b/src/audio/Bass.java @@ -716,6 +716,7 @@ public interface Bass extends Library { boolean BASS_SampleStop(int handle); int BASS_StreamCreate(int freq, int chans, int flags, STREAMPROC proc, Pointer user); + int BASS_StreamCreate(int freq, int chans, int flags, Pointer proc, Pointer user); int BASS_StreamCreateFile(boolean mem, String file, long offset, long length, int flags); int BASS_StreamCreateFile(Pointer file, long offset, long length, int flags); int BASS_StreamCreateURL(String url, int offset, int flags, DOWNLOADPROC proc, Pointer user); @@ -785,8 +786,7 @@ public interface Bass extends Library { boolean BASS_FXGetParameters(int handle, Object params); boolean BASS_FXSetPriority(int handle, int priority); boolean BASS_FXReset(int handle); - // gak bisa - int BASS_StreamCreate(int freq, int chans, int flags, int proc, Pointer user); + } diff --git a/src/audio/OpusStreamReceiver.kt b/src/audio/OpusStreamReceiver.kt new file mode 100644 index 0000000..7e14053 --- /dev/null +++ b/src/audio/OpusStreamReceiver.kt @@ -0,0 +1,77 @@ +package audio + +import com.sun.jna.Memory +import com.sun.jna.Pointer +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +/** + * OpusStreamReceiver is a class that handles receiving Opus audio streams. + * @param deviceID The ID of the audio device to use for playback. + * @param samplingrate The sampling rate for the audio stream, default is 16000 Hz. + * @throws Exception if the device cannot be set. + */ +@Suppress("unused") +class OpusStreamReceiver(deviceID: Int, val samplingrate: Int = 16000) { + private val bass = Bass.Instance + private val bassopus = BASSOPUS.Instance + private var filehandle = 0 + private val logger : Logger = LoggerFactory.getLogger(OpusStreamReceiver::class.java) + var isPlaying = false + + + init{ + if (!bass.BASS_SetDevice(deviceID)){ + throw Exception("Failed to set device $deviceID") + } + } + + /** + * Starts the Opus stream playback. + * @return true if the stream started successfully, false otherwise. + */ + fun Start() : Boolean{ + val opushead = BASSOPUS.BASS_OPUS_HEAD() + opushead.version = 1 + opushead.channels = 1 + opushead.inputrate = samplingrate + val procpush = Pointer(-1) + filehandle = bassopus.BASS_OPUS_StreamCreate(opushead,0, procpush, null) + if (filehandle != 0){ + if (bass.BASS_ChannelPlay(filehandle,false)){ + isPlaying = true + return true + } else logger.error("BASS_ChannelPlay failed for filehandle $filehandle, code ${bass.BASS_ErrorGetCode()}") + } else logger.error("BASS_OPUS_StreamCreate failed, code ${bass.BASS_ErrorGetCode()}") + return false + } + + /** + * Stops the Opus stream playback and frees the resources. + */ + fun Stop(){ + if (filehandle!=0){ + bass.BASS_ChannelStop(filehandle) + bass.BASS_StreamFree(filehandle) + filehandle = 0 + isPlaying = false + } + } + + /** + * Pushes audio data to the Opus stream for playback. + */ + fun PushData(data: ByteArray): Boolean { + if (filehandle == 0 || !isPlaying) return false + if (data.isEmpty()) return false + val buffer = Memory(data.size.toLong()) + buffer.write(0, data, 0, data.size) + val result = bassopus.BASS_OPUS_StreamPutData(filehandle, buffer, data.size) + if (result>=0){ + return true + } else { + logger.error("PushData failed, error code: ${bass.BASS_ErrorGetCode()}") + return false + } + } +} \ No newline at end of file diff --git a/src/zello/StartStreamCommand.kt b/src/zello/StartStreamCommand.kt index ad67d64..b68a120 100644 --- a/src/zello/StartStreamCommand.kt +++ b/src/zello/StartStreamCommand.kt @@ -4,9 +4,8 @@ import com.fasterxml.jackson.annotation.JsonPropertyOrder @Suppress("unused") @JsonPropertyOrder(value=["command","seq","channels","type","codec","codec_header","packet_duration","For"]) -class StartStreamCommand(val seq: Int, val channels: String, val packet_duration: Int=20, val For:String ) { +class StartStreamCommand(val seq: Int, val channels: String, val packet_duration: Int=20, val codec_header: String, val For:String?=null ) { val command: String = "start_stream" val type: String = "audio" val codec: String = "opus" - val codec_header: String ="gD4BPA==" // base64 encoded header for [opus codec header](https://github.com/zelloptt/zello-channel-api/blob/master/API.md#codec_header-attribute) } \ No newline at end of file diff --git a/src/zello/ZelloClient.kt b/src/zello/ZelloClient.kt index 6117863..0d10beb 100644 --- a/src/zello/ZelloClient.kt +++ b/src/zello/ZelloClient.kt @@ -1,13 +1,16 @@ package zello +import com.fasterxml.jackson.module.kotlin.contains import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import org.java_websocket.client.WebSocketClient import org.java_websocket.handshake.ServerHandshake import org.slf4j.Logger import org.slf4j.LoggerFactory +import somecodes.Codes import java.lang.Exception import java.net.URI import java.nio.ByteBuffer +import java.util.function.BiConsumer @Suppress("unused") /** @@ -15,21 +18,21 @@ import java.nio.ByteBuffer * [Source](https://github.com/zelloptt/zello-channel-api/blob/master/API.md) * */ -class ZelloClient(val address : URI) { +class ZelloClient(val address : URI, val username: String, val password: String) { private val streamJob = HashMap() private val imageJob = HashMap() private val commandJob = HashMap() companion object { - fun fromConsumerZello() : ZelloClient { - return ZelloClient(URI.create("wss://zello.io/ws")) + fun fromConsumerZello(username : String, password: String) : ZelloClient { + return ZelloClient(URI.create("wss://zello.io/ws"), username, password) } - fun fromZelloWork(networkName : String) : ZelloClient{ - return ZelloClient(URI.create("wss://zellowork.io/ws/$networkName")) + fun fromZelloWork(username: String, password: String, networkName : String) : ZelloClient{ + return ZelloClient(URI.create("wss://zellowork.io/ws/$networkName"), username, password) } - fun fromZelloEnterpriseServer(serverDomain: String) : ZelloClient{ - return ZelloClient(URI.create("wss://$serverDomain/ws/mesh")) + fun fromZelloEnterpriseServer(username: String, password: String, serverDomain: String) : ZelloClient{ + return ZelloClient(URI.create("wss://$serverDomain/ws/mesh"), username, password) } } @@ -49,19 +52,18 @@ class ZelloClient(val address : URI) { fun Start(event: ZelloEvent){ client = object : WebSocketClient(address) { override fun onOpen(handshakedata: ServerHandshake?) { - logger.info("Connected to $address") + //logger.info("Connected to $address") seqID = 0 inc_seqID() - val lg = LogonCommand.create(seqID,channels, developerKey) + val lg = LogonCommand.create(seqID,channels, developerKey, username, password) val value = mapper.writeValueAsString(lg) - logger.info("Sending LogonCommand: $value") send(value) } override fun onMessage(bytes: ByteBuffer?) { val receivedbytes = bytes?.capacity() ?: 0 - logger.info("Binary message received, length: $receivedbytes") + //logger.info("Binary message received, length: $receivedbytes") if (receivedbytes>0){ when(bytes?.get(0)){ 0x01.toByte() -> { @@ -70,10 +72,13 @@ class ZelloClient(val address : URI) { val str_id = bytes.getInt(1) val packet_id = bytes.getInt(5) val data = ByteArray(receivedbytes - 9) - bytes.get(data, 9, receivedbytes - 9) - logger.info("Audio stream received, stream_id=$str_id, packet_id=$packet_id, data length=${data.size}") + bytes.get(9,data ) + val job = streamJob[str_id] + if (job!=null){ + job.pushAudioData(data) + event.onStreamingData(job.from, job.For?:"", job.channel, data) + } - streamJob[str_id]?.pushAudioData(data) } 0x02.toByte() ->{ // image stream @@ -82,7 +87,7 @@ class ZelloClient(val address : URI) { val image_id = bytes.getInt(1) val image_type = bytes.getInt(5) val data = ByteArray(receivedbytes - 9) - bytes.get(data, 9, receivedbytes - 9) + bytes.get(9, data) logger.info("Image stream received, image_id=$image_id, image_type=$image_type, data length=${data.size}") // Here you can process the image data // image_type 0x01 is full image, 0x02 is thumbnail @@ -113,59 +118,87 @@ class ZelloClient(val address : URI) { logger.info("Message received: $message") val jsnode = mapper.readTree(message) if (jsnode["seq"] != null) { - val seq = jsnode["seq"].asInt() - if (seq == 1){ - //logon reply - val lgreply = mapper.treeToValue(jsnode, LogonReply::class.java) - if (lgreply.success==true){ - logger.info("successfully logged on, refresh_token=${lgreply.refresh_token}") - refresh_token = lgreply.refresh_token - event.onConnected() - } else { - logger.error("Failed to logon: ${lgreply.error ?: "Unknown error"}") + val seq = jsnode.get("seq").asInt() + when(seq){ + 1 ->{ + //logon reply + val lgreply = mapper.treeToValue(jsnode, LogonReply::class.java) + if (lgreply.success==true){ + refresh_token = lgreply.refresh_token + event.onConnected() + } else { + logger.error("Failed to logon: ${lgreply.error ?: "Unknown error"}") + } } - } else { - // other commands - when (jsnode["command"]?.asText()){ - "on_channel_status" -> { - val channelstatus = mapper.treeToValue(jsnode, Event_OnChannelStatus::class.java) - event.onChannelStatus(channelstatus.channel, channelstatus.status, channelstatus.users_online, channelstatus.error, channelstatus.error_type) - } - "on_error" -> { - val error = mapper.treeToValue(jsnode, Event_OnError::class.java) - event.onError(error.error) - } - "on_stream_start" ->{ - val streamstart = mapper.treeToValue(jsnode, Event_OnStreamStart::class.java) - logger.info("Stream started on channel ${streamstart.channel} from ${streamstart.from} for ${streamstart.For}") - streamJob.put(streamstart.stream_id, ZelloAudioJob.fromEventOnStreamStart(streamstart)) - } - "on_stream_stop" -> { - val streamstop = mapper.treeToValue(jsnode, Event_OnStreamStop::class.java) - logger.info("Stream stopped on channel ${streamstop.stream_id}") - val job = streamJob.remove(streamstop.stream_id) - if (job!=null){ - event.onAudioData(job.streamID, job.from, job.For?:"", job.channel, job.getAudioData()) + else ->{ + // other sequence reply + val j = commandJob.remove(seq) + if (j is BiConsumer<*, *>) { + val sucess = jsnode["success"]?.asBoolean() ?: false + if (jsnode.contains("stream_id")){ + // start stream reply + val job = j as BiConsumer + job.accept(sucess, jsnode["stream_id"].asInt()) + } else if (jsnode.contains("image_id")){ + // send image reply + val job = j as BiConsumer + job.accept(sucess, jsnode["image_id"].asInt()) + } else { + // normal success / fail reply + val job = j as BiConsumer + job.accept(sucess,0) } + + + } - "on_image" ->{ - val image = mapper.treeToValue(jsnode, Event_OnImage::class.java) - logger.info("Image received, image_id=${image.message_id}, from=${image.from}, For=${image.For}, channel=${image.channel}") - imageJob.put(image.message_id, ZelloImageJob(image.channel,image.message_id,image.height,image.width,image.source,image.from,image.For)) - } - "on_text_message" ->{ - val textmessage = mapper.treeToValue(jsnode, Event_OnTextMessage::class.java) - event.onTextMessage(textmessage.message_id, textmessage.from, textmessage.For, textmessage.channel, textmessage.text, System.currentTimeMillis()) - } - "on_location" -> { - val location = mapper.treeToValue(jsnode, Event_OnLocation::class.java) - event.onLocation(location.message_id, location.from, location.For, location.channel, location.latitude, location.longitude, location.formatted_address, location.accuracy, System.currentTimeMillis()) - } - else ->{ - // handle other commands - logger.warn("Unknown command: ${jsnode["command"]?.asText()}") + } + } + } else { + // event, tidak ada seq + when (jsnode["command"]?.asText()){ + "on_channel_status" -> { + val channelstatus = mapper.treeToValue(jsnode, Event_OnChannelStatus::class.java) + event.onChannelStatus(channelstatus.channel, channelstatus.status, channelstatus.users_online, channelstatus.error, channelstatus.error_type) + } + "on_error" -> { + val error = mapper.treeToValue(jsnode, Event_OnError::class.java) + event.onError(error.error) + } + "on_stream_start" ->{ + val streamstart = mapper.treeToValue(jsnode, Event_OnStreamStart::class.java) + logger.info("Stream started on channel ${streamstart.channel} from ${streamstart.from} for ${streamstart.For}") + streamJob.put(streamstart.stream_id, ZelloAudioJob.fromEventOnStreamStart(streamstart)) + event.onStartStreaming(streamstart.from, streamstart.For, streamstart.channel) + } + "on_stream_stop" -> { + val streamstop = mapper.treeToValue(jsnode, Event_OnStreamStop::class.java) + logger.info("Stream stopped on channel ${streamstop.stream_id}") + + val job = streamJob.remove(streamstop.stream_id) + + if (job!=null){ + event.onStopStreaming(job.from, job.For?:"", job.channel) + event.onAudioData(job.streamID, job.from, job.For?:"", job.channel, job.getAudioData()) } } + "on_image" ->{ + val image = mapper.treeToValue(jsnode, Event_OnImage::class.java) + logger.info("Image received, image_id=${image.message_id}, from=${image.from}, For=${image.For}, channel=${image.channel}") + imageJob.put(image.message_id, ZelloImageJob(image.channel,image.message_id,image.height,image.width,image.source,image.from,image.For)) + } + "on_text_message" ->{ + val textmessage = mapper.treeToValue(jsnode, Event_OnTextMessage::class.java) + event.onTextMessage(textmessage.message_id, textmessage.from, textmessage.For, textmessage.channel, textmessage.text, System.currentTimeMillis()) + } + "on_location" -> { + val location = mapper.treeToValue(jsnode, Event_OnLocation::class.java) + event.onLocation(location.message_id, location.from, location.For, location.channel, location.latitude, location.longitude, location.formatted_address, location.accuracy, System.currentTimeMillis()) + } + else ->{ + // handle other commands + logger.warn("Unknown command: ${jsnode["command"]?.asText()}") + } } } @@ -195,10 +228,47 @@ class ZelloClient(val address : URI) { client?.connect() } + /** + * Stops the Zello client and closes the WebSocket connection. + */ fun Stop(){ client?.close() } + /** + * Starts a stream on the specified channel with the given sampling rate. + * will raise a callback where Boolean is true if the stream started successfully, and Int is the stream ID. + * Stream ID will be used to stop the stream later. + * @param chanel which channel to stream to + * @param samplingrate the sampling rate for the audio stream + * @param For optional parameter to specify the recipient of the stream + * @param callback a callback to handle the result of the stream start operation + */ + fun StartStream(chanel: String, samplingrate: Int, For: String? = null, callback: BiConsumer){ + val seqID = inc_seqID() + val x = StartStreamCommand( + seqID, + chanel, + 20, + Codes.toCodecHeader(samplingrate,1,60), + For) + client?.send(mapper.writeValueAsString(x)) + // put here, because require a response in onMessage + commandJob.put(seqID, callback) + } + + /** + * Stops a stream on the specified channel with the given stream ID. + * @param channel which channel to stop the stream on + * @param streamID the ID of the stream to stop + */ + fun StopStream(channel: String, streamID: Int){ + val seqID = inc_seqID() + val x = StopStreamCommand(seqID, streamID, channel) + client?.send(mapper.writeValueAsString(x)) + + } + private fun inc_seqID(): Int{ if (seqID < Int.MAX_VALUE) { seqID++ diff --git a/src/zello/ZelloEvent.kt b/src/zello/ZelloEvent.kt index 0980751..504f4ec 100644 --- a/src/zello/ZelloEvent.kt +++ b/src/zello/ZelloEvent.kt @@ -11,4 +11,7 @@ interface ZelloEvent { fun onConnected() fun onDisconnected() fun onError(errorMessage: String) + fun onStartStreaming(from: String, For: String, channel: String) + fun onStopStreaming(from: String, For: String, channel: String) + fun onStreamingData(from: String, For: String, channel: String, data: ByteArray) } \ No newline at end of file