-
Notifications
You must be signed in to change notification settings - Fork 2
PubSub background function adapter #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| /* | ||
| * Copyright 2012-2019 the original author or authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.springframework.cloud.function.adapter.gcloud; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
|
|
||
| import com.google.cloud.functions.Context; | ||
| import com.google.cloud.functions.HttpResponse; | ||
| import org.reactivestreams.Publisher; | ||
| import reactor.core.publisher.Flux; | ||
|
|
||
| import org.springframework.cloud.function.context.AbstractSpringFunctionAdapterInitializer; | ||
| import org.springframework.messaging.Message; | ||
|
|
||
| public class GcfHandler<O> extends AbstractSpringFunctionAdapterInitializer<Context> { | ||
|
|
||
| public GcfHandler(Class<?> configurationClass) { | ||
| super(configurationClass); | ||
| init(); | ||
| } | ||
|
|
||
| public GcfHandler() { | ||
| super(); | ||
| init(); | ||
| } | ||
|
|
||
| public void init() { | ||
| Thread.currentThread().setContextClassLoader(GcfSpringBootHttpRequestHandler.class.getClassLoader()); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Set classloader to the GcfHandler class |
||
| initialize(null); | ||
| } | ||
|
|
||
| Object toOptionalIfEmpty(String requestBody) { | ||
| return requestBody.isEmpty() ? Optional.empty() : requestBody; | ||
| } | ||
|
|
||
| protected boolean functionAcceptsMessage() { | ||
| return this.getInspector().isMessage(function()); | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| protected <T> T result(Object input, Publisher<?> output, HttpResponse resp) { | ||
| List<T> result = new ArrayList<>(); | ||
| for (Object value : Flux.from(output).toIterable()) { | ||
| result.add((T) convertOutputAndHeaders(value, resp)); | ||
| } | ||
| if (isSingleValue(input) && result.size() == 1) { | ||
| return result.get(0); | ||
| } | ||
| return (T) result; | ||
| } | ||
|
|
||
| private boolean isSingleValue(Object input) { | ||
| return !(input instanceof Collection); | ||
| } | ||
|
|
||
| Flux<?> extract(Object input) { | ||
| if (input instanceof Collection) { | ||
| return Flux.fromIterable((Iterable<?>) input); | ||
| } | ||
| return Flux.just(input); | ||
| } | ||
|
|
||
| protected O convertOutputAndHeaders(Object output, HttpResponse resp) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe for adding background function we should take a similar approach to the PR for the HttpFunction, like just keeping it minimal without headers etc. Would be easier to review and get in to the Spring team You could make a branch from this branch to save your code as a checkpoint, then in this PR reduce it to the minimum functionality needed to get PubSub background function working? similar to what Mike sent to the Spring team?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. We should try to keep the PRs smaller. |
||
| if (output instanceof Message) { | ||
| Message<?> message = (Message<?>) output; | ||
| for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) { | ||
| Object values = entry.getValue(); | ||
| if (values instanceof List) { | ||
| for (Object value : (List) values) { | ||
| if (value != null) { | ||
| resp.appendHeader(entry.getKey(), value.toString()); | ||
| } | ||
| } | ||
| } | ||
| else if (values != null) { | ||
| resp.appendHeader(entry.getKey(), values.toString()); | ||
| } | ||
| } | ||
| return (O) message.getPayload(); | ||
| } | ||
| else { | ||
| return (O) output; | ||
| } | ||
| } | ||
|
|
||
| boolean returnsOutput() { | ||
| return !this.getInspector().getOutputType(function()).equals(Void.class); | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| /* | ||
| * Copyright 2012-2019 the original author or authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.springframework.cloud.function.adapter.gcloud; | ||
|
|
||
| import java.io.BufferedReader; | ||
| import java.io.BufferedWriter; | ||
| import java.io.IOException; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
|
|
||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||
| import com.google.cloud.functions.HttpFunction; | ||
| import com.google.cloud.functions.HttpRequest; | ||
| import com.google.cloud.functions.HttpResponse; | ||
| import org.reactivestreams.Publisher; | ||
|
|
||
| import org.springframework.beans.factory.annotation.Autowired; | ||
| import org.springframework.messaging.MessageHeaders; | ||
| import org.springframework.messaging.support.GenericMessage; | ||
|
|
||
| /** | ||
| * Implementation of HttpFunction for Google Cloud Function. | ||
| * | ||
| * @param <O> input type | ||
| * @author Dmitry Solomakha | ||
| */ | ||
| public class GcfSpringBootHttpRequestHandlerOriginal<O> extends GcfHandler<O> implements HttpFunction { | ||
|
|
||
| public GcfSpringBootHttpRequestHandlerOriginal(Class<?> configurationClass) { | ||
| super(configurationClass); | ||
| } | ||
|
|
||
| public GcfSpringBootHttpRequestHandlerOriginal() { | ||
| super(); | ||
| } | ||
|
|
||
| @Autowired | ||
| private ObjectMapper mapper; | ||
|
|
||
| @Override | ||
| public void service(HttpRequest httpRequest, HttpResponse httpResponse) throws Exception { | ||
| Publisher<?> output = apply(extract(convert(httpRequest))); | ||
| BufferedWriter writer = httpResponse.getWriter(); | ||
| Object result = result(httpRequest, output, httpResponse); | ||
| if (returnsOutput()) { | ||
| writer.write(mapper.writeValueAsString(result)); | ||
| writer.flush(); | ||
| } | ||
| httpResponse.setStatusCode(200); | ||
| } | ||
|
|
||
| private Object convert(HttpRequest event) throws IOException { | ||
| BufferedReader br = event.getReader(); | ||
| StringBuilder sb = new StringBuilder(); | ||
|
|
||
| char[] buffer = new char[1024 * 4]; | ||
| int n; | ||
| while (-1 != (n = br.read(buffer))) { | ||
| sb.append(buffer, 0, n); | ||
| } | ||
|
|
||
| String requestBody = sb.toString(); | ||
| if (functionAcceptsMessage()) { | ||
| return new GenericMessage<>(toOptionalIfEmpty(requestBody), getHeaders(event)); | ||
| } | ||
| return toOptionalIfEmpty(requestBody); | ||
| } | ||
|
|
||
| private MessageHeaders getHeaders(HttpRequest event) { | ||
| Map<String, Object> headers = new HashMap<String, Object>(); | ||
|
|
||
| if (event.getHeaders() != null) { | ||
| headers.putAll(event.getHeaders()); | ||
| } | ||
| if (event.getQueryParameters() != null) { | ||
| headers.putAll(event.getQueryParameters()); | ||
| } | ||
| if (event.getUri() != null) { | ||
| headers.put("path", event.getPath()); | ||
| } | ||
|
|
||
| if (event.getMethod() != null) { | ||
| headers.put("httpMethod", event.getMethod()); | ||
| } | ||
|
|
||
| return new MessageHeaders(headers); | ||
| } | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| /* | ||
| * Copyright 2012-2019 the original author or authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.springframework.cloud.function.adapter.gcloud; | ||
|
|
||
| import java.util.Base64; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
|
|
||
| import com.google.cloud.functions.BackgroundFunction; | ||
| import com.google.cloud.functions.Context; | ||
| import reactor.core.publisher.Flux; | ||
|
|
||
| import org.springframework.messaging.MessageHeaders; | ||
| import org.springframework.messaging.support.GenericMessage; | ||
|
|
||
| import static java.nio.charset.StandardCharsets.UTF_8; | ||
|
|
||
| public class GcfSpringBootPubSubFunctionHandler<O> extends GcfHandler<O> | ||
| implements BackgroundFunction<PubSubMessage> { | ||
|
|
||
| public GcfSpringBootPubSubFunctionHandler(Class<?> configurationClass) { | ||
| super(configurationClass); | ||
| } | ||
|
|
||
| public GcfSpringBootPubSubFunctionHandler() { | ||
| super(); | ||
| } | ||
|
|
||
| @Override | ||
| public void accept(PubSubMessage pubSubMessage, Context context) { | ||
| Flux.from(apply(extract(toMessageIfNeeded(pubSubMessage)))).blockLast(); | ||
| } | ||
|
|
||
| private Object toMessageIfNeeded(PubSubMessage pubSubMessage) { | ||
| String data = new String(Base64.getDecoder().decode(pubSubMessage.getData()), UTF_8); | ||
| if (functionAcceptsMessage()) { | ||
| return new GenericMessage<>(toOptionalIfEmpty(data), getHeaders(pubSubMessage)); | ||
| } | ||
| return toOptionalIfEmpty(data); | ||
| } | ||
|
|
||
| private Map<String, Object> getHeaders(PubSubMessage pubSubMessage) { | ||
| Map<String, Object> headers = new HashMap<String, Object>(); | ||
|
|
||
| if (pubSubMessage.getAttributes() != null) { | ||
| headers.putAll(pubSubMessage.getAttributes()); | ||
| } | ||
|
|
||
| if (pubSubMessage.getMessageId() != null) { | ||
| headers.put("messageId", pubSubMessage.getMessageId()); | ||
| } | ||
|
|
||
| if (pubSubMessage.getPublishTime() != null) { | ||
| headers.put("publishTime", pubSubMessage.getPublishTime()); | ||
| } | ||
|
|
||
| return new MessageHeaders(headers); | ||
| } | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| /* | ||
| * Copyright 2012-2019 the original author or authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.springframework.cloud.function.adapter.gcloud; | ||
|
|
||
| import java.util.Map; | ||
|
|
||
| public class PubSubMessage { | ||
|
|
||
| String data; | ||
|
|
||
| Map<String, String> attributes; | ||
|
|
||
| String messageId; | ||
|
|
||
| String publishTime; | ||
|
|
||
| public PubSubMessage() { | ||
| } | ||
|
|
||
| PubSubMessage(String data, Map<String, String> attributes, String messageId, String publishTime) { | ||
| this.data = data; | ||
| this.attributes = attributes; | ||
| this.messageId = messageId; | ||
| this.publishTime = publishTime; | ||
| } | ||
|
|
||
| public String getData() { | ||
| return data; | ||
| } | ||
|
|
||
| public void setData(String data) { | ||
| this.data = data; | ||
| } | ||
|
|
||
| public Map<String, String> getAttributes() { | ||
| return attributes; | ||
| } | ||
|
|
||
| public void setAttributes(Map<String, String> attributes) { | ||
| this.attributes = attributes; | ||
| } | ||
|
|
||
| public String getMessageId() { | ||
| return messageId; | ||
| } | ||
|
|
||
| public void setMessageId(String messageId) { | ||
| this.messageId = messageId; | ||
| } | ||
|
|
||
| public String getPublishTime() { | ||
| return publishTime; | ||
| } | ||
|
|
||
| public void setPublishTime(String publishTime) { | ||
| this.publishTime = publishTime; | ||
| } | ||
|
|
||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to just have the background function and httpFunction extend from the AbstractSpringFunctionAdapterInitializer directly? I think right now for HttpFunction to extend from GcfHandler might not be ideal if it doesn't have access to the Context.