Skip to content

[Bug] EventMeshHttpProducer publish Read timed out #3458

Closed
@mxsm

Description

@mxsm

Search before asking

  • I had searched in the issues and found no similar issues.

Environment

Windows

EventMesh version

master

What happened

When I use EventMeshHttpProducer to publish message. throw java.net.SocketTimeoutException: Read timed out

public class HTTP {
    public static void main(String[] args) throws Exception {
        EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
            .liteEventMeshAddr("192.168.1.4:10105")
            .producerGroup("TEST_PRODUCER_GROUP")
            .env("DEV")
            .idc("idc")
            .ip(IPUtils.getLocalAddress())
            .sys("1234")
            .pid(String.valueOf(ThreadUtils.getPID()))
            .userName("eventmesh")
            .password("password")
            .build();

        EventMeshHttpProducer eventMeshHttpProducer = new EventMeshHttpProducer(eventMeshClientConfig);
        Map<String, String> content = new HashMap<>();
        content.put("content", "testAsyncMessage----------------------");

        CloudEvent event = CloudEventBuilder.v1()
            .withId(UUID.randomUUID().toString())
            .withSubject("eventmesh-async-topic")
            .withSource(URI.create("/"))
            .withDataContentType("application/cloudevents+json")
            .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
            .withData(JsonUtils.toJSONString(content).getBytes(StandardCharsets.UTF_8))
            .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
            .build();
        for(int i = 0; i < 10; ++i){
            eventMeshHttpProducer.publish(event);
        }
    }
}

How to reproduce

public class HTTP {
public static void main(String[] args) throws Exception {
EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
.liteEventMeshAddr("192.168.1.4:10105")
.producerGroup("TEST_PRODUCER_GROUP")
.env("DEV")
.idc("idc")
.ip(IPUtils.getLocalAddress())
.sys("1234")
.pid(String.valueOf(ThreadUtils.getPID()))
.userName("eventmesh")
.password("password")
.build();

    EventMeshHttpProducer eventMeshHttpProducer = new EventMeshHttpProducer(eventMeshClientConfig);
    Map<String, String> content = new HashMap<>();
    content.put("content", "testAsyncMessage----------------------");

    CloudEvent event = CloudEventBuilder.v1()
        .withId(UUID.randomUUID().toString())
        .withSubject("eventmesh-async-topic")
        .withSource(URI.create("/"))
        .withDataContentType("application/cloudevents+json")
        .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
        .withData(JsonUtils.toJSONString(content).getBytes(StandardCharsets.UTF_8))
        .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
        .build();
    for(int i = 0; i < 10; ++i){
        eventMeshHttpProducer.publish(event);
    }
}

}

Debug logs

client exception

2023-03-16 22:59:54,851 DEBUG [main] HttpUtils(HttpUtils.java:112) - POST http://192.168.1.4:10105 HTTP/1.1
Exception in thread "main" org.apache.eventmesh.common.exception.EventMeshException: Publish message error, target:http://192.168.1.4:10105
	at org.apache.eventmesh.client.http.AbstractProducerHttpClient.publish(AbstractProducerHttpClient.java:56)
	at org.apache.eventmesh.client.http.producer.EventMeshHttpProducer.publish(EventMeshHttpProducer.java:47)
	at org.example.objectsize.HTTP.main(HTTP.java:48)
Caused by: java.net.SocketTimeoutException: Read timed out
	at java.base/java.net.SocketInputStream.socketRead0(Native Method)
	at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:168)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
	at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
	at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280)
	at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
	at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
	at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)
	at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)
	at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157)
	at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
	at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
	at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
	at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:72)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:221)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:165)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:140)
	at org.apache.eventmesh.client.http.util.HttpUtils.post(HttpUtils.java:115)
	at org.apache.eventmesh.client.http.util.HttpUtils.post(HttpUtils.java:65)
	at org.apache.eventmesh.client.http.util.HttpUtils.post(HttpUtils.java:57)
	at org.apache.eventmesh.client.http.AbstractProducerHttpClient.publish(AbstractProducerHttpClient.java:50)
	... 2 more
2023-03-16 23:00:09,885 pool-1-thread-1 DEBUG Stopping LoggerContext[name=2437c6dc, org.apache.logging.log4j.core.LoggerContext@a202ccb]
2023-03-16 23:00:09,886 pool-1-thread-1 DEBUG Stopping LoggerContext[name=2437c6dc, org.apache.logging.log4j.core.LoggerContext@a202ccb]...
2023-03-16 23:00:09,888 pool-1-thread-1 DEBUG Shutting down OutputStreamManager SYSTEM_OUT.false.false
2023-03-16 23:00:09,888 pool-1-thread-1 DEBUG OutputStream closed
2023-03-16 23:00:09,888 pool-1-thread-1 DEBUG Shut down OutputStreamManager SYSTEM_OUT.false.false, all resources released: true
2023-03-16 23:00:09,889 pool-1-thread-1 DEBUG Appender console stopped with status true
2023-03-16 23:00:09,889 pool-1-thread-1 DEBUG Log4j2 ConfigurationScheduler shutting down threads in java.util.concurrent.ScheduledThreadPoolExecutor@1e78a3f9[Running, pool size = 1, active threads = 0, queued tasks = 1, completed tasks = 0]
2023-03-16 23:00:09,889 pool-1-thread-1 DEBUG Stopped XmlConfiguration[location=jar:file:/E:/develop/mavenRepository/org/apache/eventmesh/eventmesh-sdk-java/1.8.1-SNAPSHOT/eventmesh-sdk-java-1.8.1-SNAPSHOT.jar!/log4j2.xml] OK
2023-03-16 23:00:09,889 pool-1-thread-1 DEBUG Stopped LoggerContext[name=2437c6dc, org.apache.logging.log4j.core.LoggerContext@a202ccb] with status true

runtime exception

2023-03-16 22:59:54,875 DEBUG [eventMesh-http-worker-2] AbstractHTTPServer(AbstractHTTPServer.java:440) - httpCommand={REQ,POST/HTTP,requestCode=104,opaque=3,header=sendMessageRequestHeader={code=104,language=JAVA,version=V1,env=DEV,idc=idc,sys=1234,pid=10988,ip=192.168.1.4:64535,username=eventmesh,passwd=password},body=sendMessageRequestBody={topic=null,bizSeqNo=null,uniqueId=null,content={"specversion":"1.0","id":"f3801022-0431-407a-bb2f-a9e1fa230cba","source":"/","type":"cloudevents","datacontenttype":"application/cloudevents+json","subject":"eventmesh-async-topic","protocolversion":"1.0","ip":"192.168.43.1","idc":"idc","protocoldesc":"V1","bizseqno":"927518096792131111260710628736","pid":"10988","language":"JAVA","env":"DEV","sys":"1234","ttl":"4000","uniqueid":"807133169894333271230374891000","data_base64":"eyJjb250ZW50IjoidGVzdEFzeW5jTWVzc2FnZS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0ifQ=="},ttl=null,tag=,producerGroup=TEST_PRODUCER_GROUP,extFields=null}}
2023-03-16 22:59:55,038 INFO  [eventMesh-sendMsg-1] cmd(SendAsyncMessageProcessor.java:89) - cmd=MSG_SEND_ASYNC|http|client2eventMesh|from=192.168.1.4:64535|to=192.168.43.1
2023-03-16 22:59:55,038 INFO  [eventMesh-sendMsg-1] EventMeshExtensionFactory(EventMeshExtensionFactory.java:92) - initialize extension instance success, extensionType: interface org.apache.eventmesh.protocol.api.ProtocolAdaptor, extensionInstanceName: cloudevents
2023-03-16 22:59:55,044 DEBUG [pool-4-thread-1] ConsumerManager(ConsumerManager.java:214) - grpc client info check
2023-03-16 22:59:55,044 DEBUG [pool-4-thread-1] ConsumerManager(ConsumerManager.java:223) - total number of ConsumerGroupClients: 0
2023-03-16 22:59:55,104 ERROR [eventMesh-sendMsg-1] AbstractHTTPServer(AbstractHTTPServer.java:563) - process error
java.lang.NullPointerException: null
	at java.util.Hashtable.put(Hashtable.java:460) ~[?:1.8.0_202]
	at org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer.init(EventMeshProducer.java:89) ~[classes/:?]
	at org.apache.eventmesh.runtime.core.protocol.http.producer.ProducerManager.createEventMeshProducer(ProducerManager.java:94) ~[classes/:?]
	at org.apache.eventmesh.runtime.core.protocol.http.producer.ProducerManager.getEventMeshProducer(ProducerManager.java:53) ~[classes/:?]
	at org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncMessageProcessor.processRequest(SendAsyncMessageProcessor.java:212) ~[classes/:?]
	at org.apache.eventmesh.runtime.boot.AbstractHTTPServer$HTTPHandler.lambda$processEventMeshRequest$1(AbstractHTTPServer.java:548) ~[classes/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_202]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_202]
	at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
2023-03-16 22:59:55,653 DEBUG [StandaloneConsumerThread-1] SubScribeTask(SubScribeTask.java:57) - execute subscribe task, topic: eventmesh-async-topic, offset: null
2023-03-16 22:59:56,668 DEBUG [StandaloneConsumerThread-1] SubScribeTask(SubScribeTask.java:57) - execute subscribe task, topic: eventmesh-async-topic, offset: null
2023-03-16 22:59:57,672 DEBUG [StandaloneConsumerThread-1] SubScribeTask(SubScribeTask.java:57) - execute subscribe task, topic: eventmesh-async-topic, offset: null
2023-03-16 22:59:58,682 DEBUG [StandaloneConsumerThread-1] SubScribeTask(SubScribeTask.java:57) - execute subscribe task, topic: eventmesh-async-topic, offset: null
2023-03-16 22:59:59,683 DEBUG [StandaloneConsumerThread-1] SubScribeTask(SubScribeTask.java:57) - execute subscribe task, topic: eventmesh-async-topic, offset: null
2023-03-16 23:00:00,698 DEBUG [StandaloneConsumerThread-1] SubScribeTask(SubScribeTask.java:57) - execute subscribe task, topic: eventmesh-async-topic, offset: null
2023-03-16 23:00:01,709 DEBUG [StandaloneConsumerThread-1] SubScribeTask(SubScribeTask.java:57) - execute subscribe task, topic: eventmesh-async-topic, offset: null
2023-03-16 23:00:02,711 DEBUG [StandaloneConsumerThread-1] SubScribeTask(SubScribeTask.java:57) - execute subscribe task, topic: eventmesh-async-topic, offset: null
2023-03-16 23:00:03,720 DEBUG [StandaloneConsumerThread-1] SubScribeTask(SubScribeTask.java:57) - execute subscribe task, topic: eventmesh-async-topic, offset: null
2023-03-16 23:00:04,734 DEBUG [StandaloneConsumerThread-1] SubScribeTask(SubScribeTask.java:57) - execute subscribe task, topic: eventmesh-async-topic, offset: null

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions