kafka-component.mock

->topic-partition

(->topic-partition topic partition)

accumulate-messages

(accumulate-messages consumer topic options)

Like accumulate-subscribed-messages, but subscribes to the topic first.

accumulate-subscribed-messages

(accumulate-subscribed-messages consumer {:keys [timeout at-least-n format-fn filter-fn], :or {at-least-n 1, format-fn identity, filter-fn identity}})

Helper to consume messages off any existing subscriptions. Will return as soon as at-least-n messages have been fetched. May return more, since an individual poll can fetch more. If timeout (in ms) expires first, returns any messages fetched so far. Mesages may be reformatted with format-fn then filtered with filter-fn. Messages excluded by filter-fn do not count towards at-least-n.

add-record-in-broker-state

(add-record-in-broker-state state producer-record)

add-record-to-topic

(add-record-to-topic state producer-record)

assert-producer-not-closed

(assert-producer-not-closed producer-state)

Checks conn-open? in producer state

assert-proper-record

(assert-proper-record record)

assign-partitions

(assign-partitions broker-state consumers participants participants-ch complete-ch)

broker-create-topic

(broker-create-topic)(broker-create-topic num-partitions)

broker-ensure-topic

(broker-ensure-topic broker-state topic)

broker-receive-messages

(broker-receive-messages state msg-ch)

broker-save-record!

(broker-save-record! state record)

broker-state

buffer-size

close-all-from

(close-all-from ch)

close-mock

(close-mock state)

committed-offsets

committed-record-metadata

(committed-record-metadata record)

consumer-backoff

consumer-coordinator

(consumer-coordinator state broker-state join-ch leave-ch)

consumer-rebalance-timeout

consumer-unsubscribe-timeout

consumers-with-topic-overlap

(consumers-with-topic-overlap consumers topics)

debug

debug!

(debug! enable)

default-mock-consumer-opts

default-num-partitions

fixture-restart-broker!

(fixture-restart-broker! f)

get-messages

deprecated in 0.5.10

(get-messages consumer topic timeout)(get-messages consumer timeout)

DEPRECATED: use txfm-messages instead. Poll until consumer receives some messages on topic. If timeout (in ms) expires first, return an empty vector.

get-offset

(get-offset broker-state topic partition config)

goe

macro

(goe & body)

goe-loop

macro

(goe-loop & body)

intersects?

(intersects? v1 v2)

IRebalance

protocol

members

all-topics

(all-topics this)

apply-pending-topics

(apply-pending-topics this topics)

clean-up-subscriptions

(clean-up-subscriptions this)

logger

(logger & args)

max-poll-records

(max-poll-records config)

mock-consumer

(mock-consumer config)(mock-consumer auto-subscribe-topics config)

mock-producer

(mock-producer config)

noop-cb

producer-record->consumer-record

(producer-record->consumer-record offset record)

read-offsets

(read-offsets grouped-messages)

rebalance-consumers

(rebalance-consumers relevant-consumers broker-state)

rebalance-participants

(rebalance-participants broker-state consumers participants-ch complete-ch)

Try to get all the consumers to participate in the rebalance, but if they don’t all check in, continue without some of them.

rebalance-participants-timeout

record->clj

(record->clj record)

record->topic-partition

(record->topic-partition record)

records->clj

(records->clj consumer-records)

reset-state!

(reset-state!)

send

(send producer topic k v)

send-async

(send-async producer topic k v)

shutdown!

(shutdown!)

standalone-mock-consumer-opts

start!

(start!)

txfm-messages

(txfm-messages consumer topic xf)(txfm-messages consumer topic xf options)

Like txfm-subscribed-messages, but subscribes to the topic first.

txfm-subscribed-messages

(txfm-subscribed-messages consumer xf)(txfm-subscribed-messages consumer xf {:keys [timeout ixf rf], :or {timeout 1000, ixf cat, rf conj}})

Helper to txfm messages sent to the consumer on any existing subscriptions. Messages will be transformed by the transducing function xf. Will return as soon as xf indicates it has enough records, or the timeout (in ms, by default 1000) expires. Thus, to avoid waiting for the timeout, include e.g. (take 3) in xf. If the timeout expires, will return any messages transformed so far.

For example,

(kafka-mock/send producer "topic" "key" "5")
(kafka-mock/send producer "topic" "key" "6")
(kafka-mock/send producer "topic" "key" "7")
(kafka-mock/send producer "topic" "key" "8")
(.subscribe consumer ["topic"])
(kafka-mock/txfm-subscribed-messages consumer (comp (map :value)
                                                    (map #(Integer/parseInt %))
                                                    (filter even?)
                                                    (take 1)))
;; => [6]

By default messages retrieved from (.poll consumer) will be collected as by clojure.core/cat and fed one-by-one through xf. This behvaior can be altered by providing a different ixf. For example, clojure.core/conj will feed each batch through in its entirety. This can be combined with max.poll.records to control batch size.

Also by default, the transformed messages will be accumulated as by clojure.core/conj, and returned. A different reducing function rf can be supplied. For example, if xf produces a stream of numbers, the reducing function clojure.core/+ will return their sum.

with-test-broker

macro

(with-test-broker & body)

with-test-producer-consumer

macro

(with-test-producer-consumer producer-name consumer-name & body)