Springboot File Streaming application with data validations

Paras Bansal
4 min readSep 23, 2022

This post is related to one of my previous story on Spring Cloud File Streaming. Though many companies are trying to move to cloud based solutions, still there are more than 50% workloads which are still on-prem based. There are a lot of applications still developed which are supposed to be deployed on on-prem servers and one of the most basic requirement is a file-poller.

This application uses springboot framework using spring-integration and spring-file and builds a messaging based application for the files arriving in the local directory. So instead of using spring-scheduler, which is mostly a cron kind of job running, we implement a true messaging platform which gives arrival of file as a message and executes business logic whenever file comes.

From spring documentation:

Spring Integration is motivated by the following goals:

Provide a simple model for implementing complex enterprise integration solutions.

Facilitate asynchronous, message-driven behavior within a Spring-based application.

Promote intuitive, incremental adoption for existing Spring users.

A few things about the sample application. We’ll process a orders.csv file to calculate final amount on each line items based on the discount provided.

Sample content:

order_number,order_date,order_item,unit_price,units,discount
100,2022-09-23,,10.00,100,10.00
101,2022-09-23,Apple,10.00,100,25.00

Sample expected output:

"order_number","order_date","order_item","unit_price","units","discount","final_price"
"100","2022-09-23","","10.00","100","10.00","900.0"
"101","2022-09-23","Apple","10.00","100","25.00","750.0"

We’ll be using jackson-dataformat-csv to read csv and write csv.

Coming to implementation now. We’ll implement the Channel Adapter functionality as below:

Source: Spring Documentation

Channel Adapter here connects the message channel to the source. Adapter will receive message from Message Source and since its Pollable Source in our implementation, we need to provide the polling interval.

And we’ll implement Service Activator:

Source: Spring Documentation

Service Activator here will process the request message.

→ Message Channel

@Bean
public MessageChannel fileChannel() {
return new DirectChannel();
}

This implements the Round Robin method.

→ Channel Adapter

@Bean
@InboundChannelAdapter(value = "csvChannel", poller = @Poller(fixedDelay = "10000"))
public MessageSource<File> csvFileReadingMessageSource() {
FileReadingMessageSource sourceReader= new FileReadingMessageSource();
sourceReader.setDirectory(new File(config.getCsv_input_dir()));
sourceReader.setFilter(new SimplePatternFileListFilter(config.getCsv_file_pattern()));
return sourceReader;
}

→ Service Activator

@ServiceActivator(inputChannel= "csvChannel")
public void processCsv(Message<?> message) throws Exception {
File filePath = message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class);
String fileName = message.getHeaders().get(FileHeaders.FILENAME, String.class);
log.info("CSV File found ==> " + filePath.getPath());
}

That’s it, this was the basic implementation of the spring-integration.

Another 2 important components this app touch base is CSV Validation and processing CSV file using jackson-dataformat-csv API

jackson-dataformat-csv

Read csv — The file content is converted to a List<Map> here. There are ways you can map the CSV to a Java POJO, but I find this one easy to use and implement:

public List<Map<String, String>> convertCSVtoMap(String fileContent) throws Exception {
List<Map<String, String>> data = new ArrayList<>();

CsvMapper csvMapper = new CsvMapper();
csvMapper.enable(CsvParser.Feature.SKIP_EMPTY_LINES);

Iterator<Map<String, String>> iterator = csvMapper
.readerFor(Map.class)
.with(CsvSchema.emptySchema().withHeader())
.readValues(new StringReader(fileContent));

while (iterator.hasNext())
data.add(iterator.next());

return data;
}

Similarly, you can write the output using the same List<Map> to a csv:

public void writeCSV(List<Map<String, String>> data, String path) throws Exception {
log.info("Writing CSV to local at --> " + path);
if(!Files.exists(Paths.get(path).getParent()))
Files.createDirectory(Paths.get(path).getParent());

FileWriter writer = new FileWriter(path);

CsvSchema.Builder schemaBuilder = CsvSchema.builder();
for (String col : data.get(0).keySet()) {
schemaBuilder.addColumn(col);
}

CsvSchema schema = schemaBuilder.build().withHeader();

CsvMapper mapper = new CsvMapper();
mapper.configure(CsvGenerator.Feature.ALWAYS_QUOTE_STRINGS, true);
mapper.writer(schema).writeValues(writer).writeAll(data);
writer.flush();
writer.close();

log.info("Written!");
}

CSV Data validation — Before processing the csv, I validate the data using a configuration file. This can be implemented many ways (using a POJO as well), but I find this way is easily modifiable and extendible.

What I have is a configuration file first:

{
"datepattern": "yyyy-MM-dd",
"config": [
{
"csvheader": "order_number",
"datatype": "Integer",
"nullAllowed": "n"
},
{
"csvheader": "order_date",
"datatype": "Date",
"nullAllowed": "n"
},
{
"csvheader": "order_item",
"datatype": "String",
"nullAllowed": "n"
},
{
"csvheader": "unit_price",
"datatype": "Double",
"nullAllowed": "n"
},
{
"csvheader": "units",
"datatype": "Integer",
"nullAllowed": "n"
},
{
"csvheader": "discount",
"datatype": "Double",
"nullAllowed": "y"
}
]
}

And then I use

implementation 'commons-validator:commons-validator:1.7'
implementation 'org.apache.commons:commons-lang3:3.12.0'

these two jars to implement the data validations, again a lot of other ways to implement the same data validations:

private static boolean validateNumber(String value) {
if(NumberUtils.isDigits(value)) {
return true;
}
return false;
}

private static boolean validateDate(String value, String datePattern) {
if(GenericValidator.isDate(value,datePattern, true)) {
return true;
}
return false;

}

private static boolean validateDouble(String value) {
if(NumberUtils.isCreatable(value) && !NumberUtils.isDigits(value)) {
return true;
}
return false;

}

Application configuration is specified in application.yaml

app:
csv_input_dir: ${CSV_INPUT_DIR:C:\Users\pbansal2\Downloads\}
csv_file_pattern: ${CSV_FILE_PATTERN:*.csv}
csv_processing_dir: ${CSV_PROCESSING_DIR:C:\Users\pbansal2\Downloads\processing\}
csv_success_dir: ${CSV_SUCCESS_DIR:C:\Users\pbansal2\Downloads\success\}
csv_error_dir: ${CSV_ERROR_DIR:C:\Users\pbansal2\Downloads\error\}
csv_output_dir: ${CSV_OUTPUT_DIR:C:\Users\pbansal2\Downloads\output\}

So as soon as file arrives in input location, its immediately moved to a processing dir. From there discount is calculated and written to output directory. Original file is moved to success dir. If there is any error, both original file and error.log are written to error directory.

That’s it for now. Please leave your comments/feedback. Thank you!

--

--