Spring Cloud Stream Application for Google Cloud Storage

dependencies {
implementation 'com.google.cloud:spring-cloud-gcp-starter-storage'
implementation 'org.springframework.integration:spring-integration-file'
}

dependencyManagement {
imports {
mavenBom "com.google.cloud:spring-cloud-gcp-dependencies:${springCloudGcpVersion}"
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
@Bean
@InboundChannelAdapter(channel = "streamChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<InputStream> streamingAdapter(Storage gcs) {
GcsStreamingMessageSource adapter =
new GcsStreamingMessageSource(new GcsRemoteFileTemplate(new GcsSessionFactory(gcs)));
adapter.setRemoteDirectory(gcsBucket);
adapter.setFilter(files ->
Arrays.stream(files)
.filter(f -> f.getName().contains(path))
.filter(f -> f.getName().endsWith(fileExtension))
.collect(Collectors.toUnmodifiableList())
);
return adapter;
}
@ServiceActivator(inputChannel = "streamChannel")
public void filePoller(Message<?> message) throws Exception {
String filePath = message.getHeaders().get(FileHeaders.REMOTE_FILE, String.class);
log.info("File recieved as message ==> " + filePath);

try {
//Move to porocessing
String targetFilePath = move_file(filePath, filePath.replace("incoming", "processing"));
String result = processFile(targetFilePath);

//Write output file
write_file(result, filePath.replace("incoming", "result"));

//Move to success
move_file(targetFilePath, filePath.replace("incoming", "success"));
} catch (Exception e) {
log.error("Exception --> ", e);
//move to error
move_file(filePath.replace("incoming", "processing"), filePath.replace("incoming", "error"));
}
}
  1. As soon as file is found, move it to “processing” directory
  2. Read file and map it to a pojo
  3. Calculate bonus (business logic)
  4. Writes the results to “results” directory
  5. Move the input file to “success” directory
  6. In case of error, move the file to “error” directory
public String move_file(String srcFilePath, String targetFilePath) {
Blob blob = storage.get(gcsBucket, srcFilePath);
CopyWriter copyWriter = blob.copyTo(gcsBucket, targetFilePath);
Blob copiedBlob = copyWriter.getResult();
blob.delete();

log.info("Moved object "
+ srcFilePath
+ " from bucket "
+ gcsBucket
+ " to "
+ targetFilePath
+ " in bucket "
+ copiedBlob.getBucket());
return targetFilePath;
}
public String processFile(String filePath) throws Exception {
Blob blob = storage.get(gcsBucket, filePath);
String fileContent = new String(blob.getContent());
log.debug("filecontent == " + fileContent);

ObjectMapper om = new ObjectMapper();
Employee e = om.readValue(fileContent, Employee.class);

String json = om.writeValueAsString(applyTransformation(e));
return json;
}
public void write_file(String content, String targetFilePath) {
BlobId blobId = BlobId.of(gcsBucket, targetFilePath);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
storage.create(blobInfo, content.getBytes(StandardCharsets.UTF_8));

log.info("Files written "
+ targetFilePath
+ " in bucket "
+ gcsBucket);
}
{
"name": "Shawn Reed",
"department": "technology",
"salary": "100000",
"rating": "5",
"period_of_service": "5"
}

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Paras Bansal

Paras Bansal

Solutions Architect, Cloud Enthusiast