|
8 | 8 | [ziggurat.message-payload :refer [->MessagePayload]] |
9 | 9 | [ziggurat.metrics :as metrics] |
10 | 10 | [ziggurat.timestamp-transformer :as timestamp-transformer] |
11 | | - [ziggurat.tracer :refer [tracer]] |
12 | 11 | [ziggurat.util.map :as umap] |
13 | 12 | [cambium.core :as clog]) |
14 | | - (:import [io.opentracing.contrib.kafka TracingKafkaUtils] |
15 | | - [io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier] |
16 | | - [io.opentracing.tag Tags] |
17 | | - [java.time Duration] |
| 13 | + (:import [java.time Duration] |
18 | 14 | [java.util Properties] |
19 | 15 | [java.util.regex Pattern] |
20 | 16 | [org.apache.kafka.common.errors TimeoutException] |
|
126 | 122 | (doseq [[topic-entity stream] streams] |
127 | 123 | (close-stream topic-entity stream))) |
128 | 124 |
|
129 | | -(defn- traced-handler-fn [handler-fn channels message topic-entity] |
130 | | - (let [parent-ctx (TracingKafkaUtils/extractSpanContext (:headers message) tracer) |
131 | | - span (as-> tracer t |
132 | | - (.buildSpan t "Message-Handler") |
133 | | - (.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER) |
134 | | - (.withTag t (.getKey Tags/COMPONENT) "ziggurat") |
135 | | - (if (nil? parent-ctx) |
136 | | - t |
137 | | - (.asChildOf t parent-ctx)) |
138 | | - (.start t))] |
139 | | - (try |
140 | | - ((mapper-func handler-fn channels) (-> (->MessagePayload (:value message) topic-entity) |
141 | | - (assoc :headers (:headers message)) |
142 | | - (assoc :metadata (:metadata message)))) |
143 | | - (finally |
144 | | - (.finish span))))) |
| 125 | +(defn- mapped-handler-fn [handler-fn channels message topic-entity] |
| 126 | + (try |
| 127 | + ((mapper-func handler-fn channels) |
| 128 | + (-> (->MessagePayload (:value message) topic-entity) |
| 129 | + (assoc :headers (:headers message)) |
| 130 | + (assoc :metadata (:metadata message)))) |
| 131 | + (finally))) |
145 | 132 |
|
146 | 133 | (defn- join-streams |
147 | 134 | [oldest-processed-message-in-s topic-entity stream-1 stream-2] |
|
187 | 174 | {stream :stream} (reduce (partial join-streams oldest-processed-message-in-s topic-entity) stream-map)] |
188 | 175 | (->> stream |
189 | 176 | (header-transform-values) |
190 | | - (map-values #(traced-handler-fn handler-fn channels % topic-entity))) |
| 177 | + (map-values #(mapped-handler-fn handler-fn channels % topic-entity))) |
191 | 178 | (.build builder)))) |
192 | 179 |
|
193 | 180 | (defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels] |
|
198 | 185 | (timestamp-transform-values topic-entity-name oldest-processed-message-in-s) |
199 | 186 | (header-transform-values) |
200 | 187 | (map-values #(log-and-report-metrics topic-entity-name %)) |
201 | | - (map-values #(traced-handler-fn handler-fn channels % topic-entity))) |
| 188 | + (map-values #(mapped-handler-fn handler-fn channels % topic-entity))) |
202 | 189 | (.build builder))) |
203 | 190 |
|
204 | 191 | (defn- start-stream* [handler-fn stream-config topic-entity channels] |
|
209 | 196 |
|
210 | 197 | (when-not (nil? top) |
211 | 198 | (KafkaStreams. ^Topology top |
212 | | - ^Properties (properties stream-config) |
213 | | - (new TracingKafkaClientSupplier tracer))))) |
| 199 | + ^Properties (properties stream-config))))) |
214 | 200 |
|
215 | 201 | (defn- merge-consumer-type-config |
216 | 202 | [config] |
|
0 commit comments