kafka-component.mock
accumulate-messages
(accumulate-messages consumer topic options)
Like accumulate-subscribed-messages
, but subscribes to the topic
first.
accumulate-subscribed-messages
(accumulate-subscribed-messages consumer {:keys [timeout at-least-n format-fn filter-fn], :or {at-least-n 1, format-fn identity, filter-fn identity}})
Helper to consume messages off any existing subscriptions. Will return as soon as at-least-n
messages have been fetched. May return more, since an individual poll can fetch more. If timeout
(in ms) expires first, returns any messages fetched so far. Mesages may be reformatted with format-fn
then filtered with filter-fn
. Messages excluded by filter-fn
do not count towards at-least-n
.
assert-producer-not-closed
(assert-producer-not-closed producer-state)
Checks conn-open? in producer state
assign-partitions
(assign-partitions broker-state consumers participants participants-ch complete-ch)
get-messages
deprecated in 0.5.10
(get-messages consumer topic timeout)
(get-messages consumer timeout)
DEPRECATED: use txfm-messages
instead. Poll until consumer
receives some messages on topic
. If timeout
(in ms) expires first, return an empty vector.
IRebalance
protocol
members
all-topics
(all-topics this)
apply-pending-topics
(apply-pending-topics this topics)
clean-up-subscriptions
(clean-up-subscriptions this)
rebalance-participants
(rebalance-participants broker-state consumers participants-ch complete-ch)
Try to get all the consumers to participate in the rebalance, but if they don’t all check in, continue without some of them.
txfm-messages
(txfm-messages consumer topic xf)
(txfm-messages consumer topic xf options)
Like txfm-subscribed-messages
, but subscribes to the topic
first.
txfm-subscribed-messages
(txfm-subscribed-messages consumer xf)
(txfm-subscribed-messages consumer xf {:keys [timeout ixf rf], :or {timeout 1000, ixf cat, rf conj}})
Helper to txfm messages sent to the consumer
on any existing subscriptions. Messages will be transformed by the transducing function xf
. Will return as soon as xf
indicates it has enough records, or the timeout
(in ms, by default 1000) expires. Thus, to avoid waiting for the timeout
, include e.g. (take 3)
in xf
. If the timeout
expires, will return any messages transformed so far.
For example,
(kafka-mock/send producer "topic" "key" "5")
(kafka-mock/send producer "topic" "key" "6")
(kafka-mock/send producer "topic" "key" "7")
(kafka-mock/send producer "topic" "key" "8")
(.subscribe consumer ["topic"])
(kafka-mock/txfm-subscribed-messages consumer (comp (map :value)
(map #(Integer/parseInt %))
(filter even?)
(take 1)))
;; => [6]
By default messages retrieved from (.poll consumer)
will be collected as by clojure.core/cat
and fed one-by-one through xf
. This behvaior can be altered by providing a different ixf
. For example, clojure.core/conj
will feed each batch through in its entirety. This can be combined with max.poll.records
to control batch size.
Also by default, the transformed messages will be accumulated as by clojure.core/conj
, and returned. A different reducing function rf
can be supplied. For example, if xf
produces a stream of numbers, the reducing function clojure.core/+
will return their sum.
with-test-producer-consumer
macro
(with-test-producer-consumer producer-name consumer-name & body)