这个问题研究了两天,最后被一句话点醒。在此记录一下整个问题的思考过程。

在上一篇博客中实现了一个Avro序列化器https://blog.csdn.net/QYHuiiQ/article/details/88723584,但是在实现的过程中producer里使用send(record)这种最简单的方式发送消息时一直没有在topic中读取到这条消息(现在依旧可以看到上一篇中producer是没有close的)。也就是说我刚开始使用send(record)并且没有producer.close(),结果就是没有发送消息到topic中。所以我就开始按照往常一样开始debug,看执行到哪里有问题,但是神奇的是在debug的时候消息发出去了。。。我诧异为什么run 和debug的结果会不一样,就这个问题看了一下网上普遍说的都是这两种模式的运行结果是一样的,不可能不一样。。。这时我就有了一个疑问,run和debug在执行中的原理到底是什么,为什么会导致我的程序两次运行结果不一样。在思考这个问题的同时,结合我的代码,我觉得问题就应该是在run的时候send(record)到底有没有执行,因为我怀疑在run的时候send()执行有问题。但是在run的时候你又没办法跟踪状态,但是又想知道send(record)给我们提供更多的执行信息,于是就想到了kafka提供的同步send()方法,就是send(reocrd).get(),得到一个send之后返回的信息,结果发现在这种情况下run模式也发送消息了。那么问题就集中在下面几个方面:

1.send(record)底层原理是什么。

2.为什么send(reocrd)在debug下可以发送消息,在run下不可以发送消息。

3.为什么在之前的案例中使用send(reocrd)就可以发送消息,而在这个案例中就不可以发送消息。

4.Future send = producer.send(record); 测试了一下在异步时获得返回值但是不get()的时候也是没有发送消息的。但是在debug的时候也可以发送消息。

5.producer.send(record); run不会发送。

6.producer.send(record).get(); run会发送。

7.RecordMetadata recordMetadata = producer.send(record).get(); run 会发送。

在网上看到一个说明:如果Future调用get(),则将阻塞,直到相关请求完成并返回该消息的metadata或抛送异常。所以结合上面说的,猜测应该与同步异步有关系,所以就考虑run、debug与同步异步,阻塞、非阻塞有什么关系。

在看send(record)源码的时候发现,其实send(record)底层调用的就是异步发送的send(record, callback),只不过是自动将callback置为null了,所以send(record)也是异步发送的。

在看Producer源码的过程中,可以发现每个kafka producer在new 的时候都会创建一个backgroud的ioThread,每个producer都有一个ioThread。实际上,producer的send()并不是直接发送消息到broker上的,而是在执行send()时把消息发送到了内存(Accumulator)中,然后后端的IOThread会一直扫描这个缓冲池中的消息,也就是这个线程负责真正地把消息发送到broker中。每一个Producer都是由一个持有未发送消息的资源池和一个用来向kafka集群发送消息记录的后台IOThread组成。使用后未关闭producer将导致这些资源泄漏。

producer中在new的时候的源码,创建后台IOThread:

send()中将消息放到accumulator:

其实是把消息放到了deque(一个双向队列):

借鉴一篇前辈的分析,推荐给大家:

http://generalthink.github.io/2019/03/07/kafka-producer-source-code-analysis/#group

图源上述链接。

在此过程中还猜测到:在get()的时候有结果是因为同步的阻塞要即时得到结果,而在debug的时候有结果,run的时候没有结果,那会不会是debug的时候也是着急得到结果返回,而在run的时候不着急得到结果返回所以就一直没发送消息。所以就猜想会不会是积累到一定量的时候才会发送消息,那就试一下while(true)发送大量消息,果然,在while(true)时send(record)都发送出去了。但是如果只执行一次send()就不可以发送,而且在循环发送大量数据之前发送的单条数据最终在topic里也没有读到。(其实这个时候就应该换个思路了,最后才发现。。。)那么这里的问题就在于循环发送大量消息和只发送一条消息的区别可能就是发送多条消息时每一次循环send之后都是一些时间,那我就想到了在发送单条消息的时候可以让线程睡眠一会儿,所以我就在send(record)之后,Thread.sleep(1000),结果是可以发送消息。那么就猜想send之后睡眠会不会又是在等send的结果,就又类似同步了,所以就把线程睡眠放到了send()之前,结果就不会发送数据了。这样就猜想会不会是在send之后如果还有什么其它语句要执行,就会发送成功,就想到去看了一下之前的案例中send之后有没有执行什么,发现之前的代码在send之后还有一句producer.close(),然后测试了一下之前的案例把close()去掉,结果也不会发送消息了。然后又测试了在本案例中加上producer.close()也可以发送单条数据了。

上面这几种例子:循环发送数据、send后睡眠、get()这几情况目前看来好像是在send之后又执行了一些语句就可以发送消息,但是刚才也有一个情况是debug的时候也会发送消息。所以想总结出来他们的共同点,好像也没有什么共同点。然后又想到官网上面说在使用producer时,必须调用close(),否则会造成资源泄漏,消息丢失。

为什么在没有close的时候,睡眠一会儿也是可以的,所以就想线程睡眠和close之间有什么共同点,线程睡眠可能是做了什么与close一样的事。所以就去看源码close到底干了什么:

源码中注释说close方法会一直阻塞,直到之前的所有发送请求全部完成。close()时,关闭producer对象,主要操作是设置close标志,等待RecordAccumulator中的消息清空,关闭Sender线程(负责发送到Broker的线程)。

后来突然发现上面的这些情况,不是因为死循环、睡眠、debug的时候是同步的发送消息,而是这些情况下消息压根就没有发出去,因为在topic中始终没有读到消息(其实这一点在最开始就应该想到,与发送方式无关,因为不管哪种发送方式,最后都应该有消息才对。)。这就是为什么官网说要close防止资源泄漏,数据丢失。

在思考这个问题的过程中又看了网上的一些说法:

按照网友的案例,我试着把sleep的时间改小,结果我的改为10ms之后,也不会发送消息了。看来这与线程是否sleep没有关系,和时间有关系。用二分法测出来了临界值是15ms一下,就发送不出去消息了。

这好像是个很有用的线索。。。但是依然不知道为什么啊,就继续看网上的信息。直到看到这篇评论中这句话,一切好像都解释的通了(https://stackoverflow.com/questions/50873413/why-producer-sendrecord-get-works-but-producer-sendrecord-callback-does-n)。

这位网友提到如果我们的主线程立刻退出了,而后端的IOThread还没来得及从队列中获取消息并发送给broker,这就导致了消息丢失了。直到看到这句话,才恍然大悟,尝试着解释刚才测试的几种案例,在线程睡眠一定时间时可以发送消息是因为这段时间io线程可以把消息读取并发送到broker中;在denug时由于中间一步一步执行,比run模式运行的慢一些,也给了后台线程一定的时间去读取并发送消息,循环发送大量数据时每一条数据发送后也都会继续运行使io线程有时间读取发送消息,而close就是为了防止主线程很快停掉导致数据丢失,所以官网要求我们必须用close就是因为close方法是阻塞的,直到消息全部发送出去,保证了消息不丢失。至此,找出了为什么在某些情况下可以发送消息,有些情况下不可以发送消息的疑惑。

在这中间,也加深了对一些东西的深入理解。但仍然是浅层次的理解,在以后的开发中还是要多思考原理。。。

原文链接:https://blog.csdn.net/QYHuiiQ/article/details/88757209


愿你出走半生,归来仍是少年