Spring Cloud Stream Application for Google Cloud Storage

Paras Bansal
5 min readMay 27, 2022

From Spring documentation —

Spring Integration enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters. Those adapters provide a higher-level of abstraction over Spring’s support for remoting, messaging, and scheduling. Spring Integration’s primary goal is to provide a simple model for building enterprise integration solutions while maintaining the separation of concerns that is essential for producing maintainable, testable code.

There are lots of use-cases where one needs to poll for a file on GCS/S3 location and do some processing on the top of that as soon as file arrives. Spring integration provides the capability to integrate with cloud providers and build message driven microservices.

One can come up with a streaming application using spring integration, stream content of files in the application, process it and provide the output. This is also supported by AWS via Lambda and GCP via Cloud functions, but you would need to write a lot of infrastructure code in addition to application code. Moreover you’ll be limited on the local storage, memory and cpu when using cloud services.

This simple example explores the spring integration implementation using Google Cloud Storage. Even though its not cloud agnostic, but I can surely say using this example you can easily build solution for any major cloud provider.

build.gradle dependencies

dependencies {
implementation 'com.google.cloud:spring-cloud-gcp-starter-storage'
implementation 'org.springframework.integration:spring-integration-file'
}

dependencyManagement {
imports {
mavenBom "com.google.cloud:spring-cloud-gcp-dependencies:${springCloudGcpVersion}"
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}

As you see, apart from default spring application, I am using only 2 additional libraries, GCP storage and spring integration

MessageSource Adapater — You can create a adapter bean using following method:

@Bean
@InboundChannelAdapter(channel = "streamChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<InputStream> streamingAdapter(Storage gcs) {
GcsStreamingMessageSource adapter =
new GcsStreamingMessageSource(new GcsRemoteFileTemplate(new GcsSessionFactory(gcs)));
adapter.setRemoteDirectory(gcsBucket);
adapter.setFilter(files ->
Arrays.stream(files)
.filter(f -> f.getName().contains(path))
.filter(f -> f.getName().endsWith(fileExtension))
.collect(Collectors.toUnmodifiableList())
);
return adapter;
}

EDIT

Recently I found a performance issue with this Google driver (might be fixed in future).

For the code GcsSessionFactory(gcs)

It creates a GcsSession and since it is an example of list of files, the GcsSession.list.

The implementation of GCSSession.list is here:

https://github.com/spring-attic/spring-cloud-gcp/blob/5634d4b1b4ac63d9c09a67a23827f03034bf72d1/spring-cloud-gcp-storage/src/main/java/org/springframework/cloud/gcp/storage/integration/GcsSession.java#L75

As per the code, it only supports listing of whole buckets and not folders:

@Override
public BlobInfo[] list(String path) throws IOException {
// TODO(joaomartins): Only supports listing buckets, not folders.

Collection<BlobInfo> blobs = new ArrayList<>();

for (Blob blob : this.gcs.list(path).iterateAll()) {
blobs.add(blob);
}

return blobs.toArray(new BlobInfo[blobs.size()]);
}

This creates a performance issue when there are a lot of files present in the bucket and you want to poll only for a single folder. The get around on this is using the direct GCS driver as below:

import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

@Scheduled(fixedDelay = 10000)
public void filePoller() {
log.info("Looking for files under == gs://" + gcsBucket + "/" + gcsBasePath + "/" + inputDir);
ArrayList<BlobInfo> blobs = new ArrayList<>();

for (Blob blob : gcs.list(gcsBucket,
Storage.BlobListOption.prefix(gcsBasePath + "/" + inputDir)).iterateAll()) {
blobs.add(blob);
}

log.info("files found == " + blobs);

for(BlobInfo b : blobs) {
processBlob(b.getName());
}
}

Using the BlobListOption, the GCS driver ensures that its polling for that particular GCS folder only and not the entire bucket.

So, if you want to resolve the potential performance issue, there are two options:

  1. Extend the google spring integration code in your local repository, modify it to poll for a folder (which is supported in the GCS driver). This way you can still keep the spring integration framework
  2. Use the Spring Scheduler kind of file poller as above.

This supports a channel name and poller with delay in milliseconds.

From documentation —

Spring Integration provides a wide selection of channel adapters and gateways to communicate with external systems. Channel Adapters are used for one-way integration (send or receive); gateways are used for request/reply scenarios (inbound or outbound). For a full list of adapters and gateways, refer to the reference documentation.

In the code above, I am using GcsStreamingMessageSource from GCP storage driver, which provides the implementation of MessageSource (from spring) abstraction. I am also setting filters based on the file path and extension so that only particular directory is scanned.

ServiceActivator — Indicates that a method is capable of handling a message or message payload.

@ServiceActivator(inputChannel = "streamChannel")
public void filePoller(Message<?> message) throws Exception {
String filePath = message.getHeaders().get(FileHeaders.REMOTE_FILE, String.class);
log.info("File recieved as message ==> " + filePath);

try {
//Move to porocessing
String targetFilePath = move_file(filePath, filePath.replace("incoming", "processing"));
String result = processFile(targetFilePath);

//Write output file
write_file(result, filePath.replace("incoming", "result"));

//Move to success
move_file(targetFilePath, filePath.replace("incoming", "success"));
} catch (Exception e) {
log.error("Exception --> ", e);
//move to error
move_file(filePath.replace("incoming", "processing"), filePath.replace("incoming", "error"));
}
}

Make sure that the channel name matches. The method implements the overall functionality of the application. Let’s discuss on application a little bit.

In this application, I am calculating the bonus of employees based on a rating they got and also depending on the numbers of years of service.

The application reads a JSON file from “incoming” directory and calculates the bonus and writes back to “results” directory. Here is the flow:

  1. Poll “incoming” directory
  2. As soon as file is found, move it to “processing” directory
  3. Read file and map it to a pojo
  4. Calculate bonus (business logic)
  5. Writes the results to “results” directory
  6. Move the input file to “success” directory
  7. In case of error, move the file to “error” directory

Moving a file from incoming to processing directory is very important. If you don’t move, then spring will sense the same file again after the polling interval. So polling interval becomes of very important here.

If you want to see which files got processed/error out, you can refer to directories or you need to store the state in a database.

Moving files from GCS —

public String move_file(String srcFilePath, String targetFilePath) {
Blob blob = storage.get(gcsBucket, srcFilePath);
CopyWriter copyWriter = blob.copyTo(gcsBucket, targetFilePath);
Blob copiedBlob = copyWriter.getResult();
blob.delete();

log.info("Moved object "
+ srcFilePath
+ " from bucket "
+ gcsBucket
+ " to "
+ targetFilePath
+ " in bucket "
+ copiedBlob.getBucket());
return targetFilePath;
}

Reading files from GCS and mapping to POJO —

public String processFile(String filePath) throws Exception {
Blob blob = storage.get(gcsBucket, filePath);
String fileContent = new String(blob.getContent());
log.debug("filecontent == " + fileContent);

ObjectMapper om = new ObjectMapper();
Employee e = om.readValue(fileContent, Employee.class);

String json = om.writeValueAsString(applyTransformation(e));
return json;
}

Writing files to GCS —

public void write_file(String content, String targetFilePath) {
BlobId blobId = BlobId.of(gcsBucket, targetFilePath);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
storage.create(blobInfo, content.getBytes(StandardCharsets.UTF_8));

log.info("Files written "
+ targetFilePath
+ " in bucket "
+ gcsBucket);
}

JSON —

{
"name": "Shawn Reed",
"department": "technology",
"salary": "100000",
"rating": "5",
"period_of_service": "5"
}

Well, that’s it. You can solve your most complex applications by applying this simple strategy. I saw one more similar example on medium from Olga Grygorian which creates a Kafka message after the processing is complete. The possibilities are endless.

Thank you! Please leave your feedback.

--

--