Hii am working on coap protocol for an Iot Application .I' ve made this as a Spark-Streaming Source.
i simply 've made a resource and add it to my server as a observable resource. then i start observing it through myclient,
It fetches the data perfectly but also gives the following exception.
15/10/14 13:49:48 ERROR BlockGenerator: Error in block pushing thread
java.io.NotSerializableException: org.eclipse.californium.core.CoapObserveRelation
Serialization stack:
- object not serializable (class: org.eclipse.californium.core.CoapObserveRelation, value: org.eclipse.californium.core.CoapObserveRelation@25d5cb65)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1189)
at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1198)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:850)
at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
at org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:77)
at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:141)
at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:112)
at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$2.onPushBlock(ReceiverSupervisorImpl.scala:97)
at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:198)
at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:89)
can anyone have idea about this.?
This is my custom receiver for making my client to stream it.
class customReceiver(test:String) extends Receiver[CoapObserveRelation](StorageLevel.MEMORY_AND_DISK_2) with Logging{
@volatile private var stopped = false
override def onStart() {
val client = new CoapClient("coap://localhost/temp")
var response=client.observe(new CoapHandler() {
override def onLoad(response: CoapResponse) {
}
override def onError() {
System.err.println("Failed")
}
})
response.proactiveCancel();
store(response)
}
}
Any help is much awaited and appreciated :)
Thanks