How To Stream/Serialize JPA Result As JAX-RS Response For Large Data

[复制链接]
查看11 | 回复5 | 2016-3-30 23:01:17 | 显示全部楼层 |阅读模式
There are times that retrieving a large data set through JPA is necessary (e.g. more than 1,000,000 records) and having them stuffed into a single instance of java.util.List is just risky (memory barrier). So, here’s a quick solution of how a JAX-RS REST resource end-point could still give us a timely Response without breaking the memory constrain through streaming or serialization of JPA entity by “pages”.
Example Database Table And JPA Entity
Database TableTo demonstrate how we could achieve outputting of large data, here’s an example MySQL database table that we could use.01
create database large_data_test_db;02
use large_data_test_db;03 04
create table generated_uuids (05
record_no bigint not null auto_increment,06
uuid varchar(100) not null,07
datetime_generated datetime not null,08
primary key(record_no),09
unique(uuid)10
);复制代码

回复

使用道具 举报

千问 | 2016-3-30 23:01:17 | 显示全部楼层
本帖最后由 elathen 于 2015-7-1 22:24 编辑
JPA Entity
Next, define the JPA entity class which represents the table structure of the above.
Codes for GeneratedUuidEntity.javapackage com.developerscrappad;import java.io.Serializable;import java.util.Date;import javax.persistence.Column;import javax.persistence.Entity;import javax.persistence.GeneratedValue;import javax.persistence.GenerationType;import javax.persistence.Id;import javax.persistence.NamedQueries;import javax.persistence.NamedQuery;import javax.persistence.Table;import javax.persistence.Temporal;import javax.persistence.TemporalType;@Entity@Table( name = "generated_uuids" )@NamedQueries( {@NamedQuery( name = "GeneratedUuidEntity.listAll", query = "SELECT u FROM GeneratedUuidEntity u" ),@NamedQuery( name = "GeneratedUuidEntity.queryRecordsSize", query = "SELECT count(u) FROM GeneratedUuidEntity u" )} )public class GeneratedUuidEntity implements Serializable {private static final long serialVersionUID = 12312312234234123L;@Id@GeneratedValue( strategy = GenerationType.IDENTITY )@Column( name = "record_no" )private Long recordNo;@Column( name = "uuid" )private String uuid;@Column( name = "datetime_generated" )@Temporal( TemporalType.TIMESTAMP )private Date datetimeGenerated;public GeneratedUuidEntity() {}public GeneratedUuidEntity( Long recordNo ) {this.recordNo = recordNo;}public GeneratedUuidEntity( Long recordNo, String uuid, Date datetimeGenerated ) {this.recordNo = recordNo;this.uuid = uuid;this.datetimeGenerated = datetimeGenerated;}public Long getRecordNo() {return recordNo;}public void setRecordNo( Long recordNo ) {this.recordNo = recordNo;}public String getUuid() {return uuid;}public void setUuid( String uuid ) {this.uuid = uuid;}public Date getDatetimeGenerated() {return datetimeGenerated;}public void setDatetimeGenerated( Date datetimeGenerated ) {this.datetimeGenerated = datetimeGenerated;}@Overridepublic int hashCode() {int hash = 0;hash += ( recordNo != null ? recordNo.hashCode() : 0 );return hash;}@Overridepublic boolean equals( Object object ) {// TODO: Warning - this method won't work in the case the id fields are not setif ( !( object instanceof GeneratedUuidEntity ) ) {
return false;}GeneratedUuidEntity other = ( GeneratedUuidEntity ) object;if ( ( this.recordNo == null && other.recordNo != null ) || ( this.recordNo != null && !this.recordNo.equals( other.recordNo ) ) ) {
return false;}return true;}@Overridepublic String toString() {return "com.developerscrappad.GeneratedUuidEntity[ recordNo=" + recordNo + " ]";}}复制代码There are two named queries defined in GeneratedUuidEntity. The GeneratedUuidEntity.queryRecordsSize is to query the total record number of the table, whereas the GeneratedUuidEntity.listAll is to retrieve all of the records in the table.

回复

使用道具 举报

千问 | 2016-3-30 23:01:17 | 显示全部楼层
Implementing The JAX-RS REST Resource (The Java EE way)
Let’s have a JAX-RS REST resource class by the name JPAStreamingRESTResourcewith an available JPA EntityManager (Persistence Unit Name: JPAStreamingPU) to be injected and to be obtained through a protected method getEntityManager().
@Path( "generated-uuids" )@Stateless( name = "JPAStreamingRESTResource", mappedName = "ejb/JPAStreamingRESTResource" )public class JPAStreamingRESTResource {@PersistenceContext( unitName = "JPAStreamingPU" )private EntityManager entityManager;protected EntityManager getEntityManager() {return entityManager;}
/** * Say "NO" to response caching */protected Response.ResponseBuilder getNoCacheResponseBuilder( Response.Status status ) {CacheControl cc = new CacheControl();cc.setNoCache( true );cc.setMaxAge( -1 );cc.setMustRevalidate( true );return Response.status( status ).cacheControl( cc );}}复制代码Besides, we have a method name getNoCacheResponseBuilder(), which is to obtain a non-caching javax.ws.rs.core.Response.ResponseBuilder, so that we don’t get weird cached results later.

回复

使用道具 举报

千问 | 2016-3-30 23:01:17 | 显示全部楼层
本帖最后由 elathen 于 2015-7-1 22:27 编辑
The JPA Invocation Methods
Next, let’s define two methods within the resource class, namely:
queryGeneratedUuidRecordsSize() – to retrieve the total number of records in the tableprivate int queryGeneratedUuidRecordsSize() {return getEntityManager().createNamedQuery( "GeneratedUuidEntity.queryRecordsSize", Long.class ).getSingleResult().intValue();}复制代码listAllGeneratedUuidEntities() – to retrieve the all of the data from the table, but with certain limitation criteria such as the start position of the record (recordPosition) and the maximum number of records per round trip to the database (recordsPerRoundTrip). The intention is to “page” the results so that the result list will not be overly bloated. We’ll see this in action later.
private List listAllGeneratedUuidEntities( int recordPosition, int recordsPerRoundTrip ) {return getEntityManager().createNamedQuery( "GeneratedUuidEntity.listAll" ).setFirstResult( recordPosition ).setMaxResults( recordsPerRoundTrip ).getResultList();}复制代码Let The Streaming BeginNow, let’s implement the resource end-point method, which retrieves the data without compromising size, at least in theoretical speaking. This method will return a JSON response with the data format of:
{"result": [{
"record_no": 1,
"uuid": "34d99089-3e36-4f00-ab93-846b61771eb3",
"datetime_generated": "2015-06-28 21:02:23"},…]}复制代码@GET@Path( "list-all" )@Produces( "application/json" )@TransactionAttribute( TransactionAttributeType.NEVER )public Response streamGeneratedUuids() {// Define the format of timestamp outputSimpleDateFormat df = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );return getNoCacheResponseBuilder( Response.Status.OK ).entity( new StreamingOutput() {// Instruct how StreamingOutput's write method is to stream the data@Overridepublic void write( OutputStream os ) throws IOException, WebApplicationException {
int recordsPerRoundTrip = 100;
// Number of records for every round trip to the database
int recordPosition = 0;
// Initial record position index
int recordSize = queryGeneratedUuidRecordsSize(); // Total records found for the query
// Start streaming the data
try ( PrintWriter writer = new PrintWriter( new BufferedWriter( new OutputStreamWriter( os ) ) ) ) {
writer.print( "{\"result\": [" );
while ( recordSize > 0 ) {
// Get the paged data set from the DB
List generatedUuidEntities = listAllGeneratedUuidEntities( recordPosition, recordsPerRoundTrip );
for ( GeneratedUuidEntity generatedUuidEntity : generatedUuidEntities ) {
if ( recordPosition > 0 ) {
writer.print( "," );
}
// Stream the data in Json object format
writer.print( Json.createObjectBuilder()
.add( "record_no", generatedUuidEntity.getRecordNo() )
.add( "uuid", generatedUuidEntity.getUuid() )
.add( "datetime_generated", df.format( generatedUuidEntity.getDatetimeGenerated() ) )
.build().toString() );
// Increase the recordPosition for every record streamed
recordPosition++;
}
// update the recordSize (remaining no. of records)
recordSize -= recordsPerRoundTrip;
}
// Done!
writer.print( "]}" );
}}} ).build();}复制代码

回复

使用道具 举报

千问 | 2016-3-30 23:01:17 | 显示全部楼层
Cord Explanation:
This is quite simple actually. The trick is to define the expression of the anonymous class StreamingOutput by overriding the write() method, which in it, first query the total record size through queryGeneratedUuidRecordsSize(), then retrieves the records page by page through listAllGeneratedUuidEntities(). This method will make several round trips to the database, depending on the recordsPerRoundTrip value defined.
Full source codes for JPAStreamingRESTResource.java:package com.developerscrappad;import java.io.BufferedWriter;import java.io.IOException;import java.io.OutputStream;import java.io.OutputStreamWriter;import java.io.PrintWriter;import java.text.SimpleDateFormat;import java.util.List;import javax.ejb.Stateless;import javax.ejb.TransactionAttribute;import javax.ejb.TransactionAttributeType;import javax.json.Json;import javax.persistence.EntityManager;import javax.persistence.PersistenceContext;import javax.ws.rs.GET;import javax.ws.rs.Path;import javax.ws.rs.Produces;import javax.ws.rs.WebApplicationException;import javax.ws.rs.core.CacheControl;import javax.ws.rs.core.Response;import javax.ws.rs.core.StreamingOutput;@Path( "generated-uuids" )@Stateless( name = "JPAStreamingRESTResource", mappedName = "ejb/JPAStreamingRESTResource" )public class JPAStreamingRESTResource {@PersistenceContext( unitName = "JPAStreamingPU" )private EntityManager entityManager;private List listAllGeneratedUuidEntities( int recordPosition, int recordsPerRoundTrip ) {return getEntityManager().createNamedQuery( "GeneratedUuidEntity.listAll" )
.setFirstResult( recordPosition )
.setMaxResults( recordsPerRoundTrip )
.getResultList();}private int queryGeneratedUuidRecordsSize() {return getEntityManager().createNamedQuery( "GeneratedUuidEntity.queryRecordsSize", Long.class )
.getSingleResult().intValue();}protected EntityManager getEntityManager() {return entityManager;}
/** * Say "NO" to response caching */protected Response.ResponseBuilder getNoCacheResponseBuilder( Response.Status status ) {CacheControl cc = new CacheControl();cc.setNoCache( true );cc.setMaxAge( -1 );cc.setMustRevalidate( true );return Response.status( status ).cacheControl( cc );}@GET@Path( "list-all" )@Produces( "application/json" )@TransactionAttribute( TransactionAttributeType.NEVER )public Response streamGeneratedUuids() {// Define the format of timestamp outputSimpleDateFormat df = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );return getNoCacheResponseBuilder( Response.Status.OK ).entity( new StreamingOutput() {
// Instruct how StreamingOutput's write method is to stream the data
@Override
public void write( OutputStream os ) throws IOException, WebApplicationException {
int recordsPerRoundTrip = 100;
// Number of records for every round trip to the database
int recordPosition = 0;
// Initial record position index
int recordSize = queryGeneratedUuidRecordsSize(); // Total records found for the query
// Start streaming the data
try ( PrintWriter writer = new PrintWriter( new BufferedWriter( new OutputStreamWriter( os ) ) ) ) {
writer.print( "{\"result\": [" );
while ( recordSize > 0 ) {
// Get the paged data set from the DB
List generatedUuidEntities = listAllGeneratedUuidEntities( recordPosition, recordsPerRoundTrip );
for ( GeneratedUuidEntity generatedUuidEntity : generatedUuidEntities ) {
if ( recordPosition > 0 ) {
writer.print( "," );
}
// Stream the data in Json object format
writer.print( Json.createObjectBuilder()
.add( "record_no", generatedUuidEntity.getRecordNo() )
.add( "uuid", generatedUuidEntity.getUuid() )
.add( "datetime_generated", df.format( generatedUuidEntity.getDatetimeGenerated() ) )
.build().toString() );
// Increase the recordPosition for every record streamed
recordPosition++;
}
// update the recordSize (remaining no. of records)
recordSize -= recordsPerRoundTrip;
}
// Done!
writer.print( "]}" );
}
}} ).build();}}复制代码

回复

使用道具 举报

千问 | 2016-3-30 23:01:17 | 显示全部楼层
Watch Out
Do remember to tune the application server’s response connection timeout value to prevent java.io.IOException Premature EOF exception being thrown by the REST or Http Client.
Testing ItTo test whether this works, just get the table loaded with just 567 records. Then, have the unit test invoke the end-point URL and save the retrieved JSON data to a file with the below unit test codes (Apache HttpClient is used):
Codes for JPAStreamingUnitTest.java:package com.developerscrappad;import java.io.File;import java.io.FileInputStream;import static org.junit.Assert.*;import java.nio.file.FileSystems;import java.nio.file.Files;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.UUID;import javax.json.Json;import javax.json.JsonArray;import javax.json.JsonObject;import javax.json.JsonReader;import org.apache.http.HttpEntity;import org.apache.http.client.methods.CloseableHttpResponse;import org.apache.http.client.methods.HttpGet;import org.apache.http.impl.client.CloseableHttpClient;import org.apache.http.impl.client.HttpClients;import org.junit.AfterClass;import org.junit.BeforeClass;import org.junit.Test;public class JPAStreamingUnitTest {private static final String dbDriverClassname = "com.mysql.jdbc.Driver";private static final String dbUrl = "jdbc:mysql://localhost:3306/large_data_test_db";private static final String username = "username";private static final String password = "password";private static final int numberOfRecords = 567;private static final String jsonResultOutputFilename = "testing123.json";@BeforeClasspublic static void setUpClass() {try {
Class.forName( dbDriverClassname );
try ( Connection conn = DriverManager.getConnection( dbUrl, username, password ) ) {
String insertSQL = "insert into generated_uuids (uuid, datetime_generated) values (?, now())";
try ( PreparedStatement stmt = conn.prepareStatement( insertSQL ) ) {
for ( int i = 0; i0 );
try {
UUID.fromString( generatedUuidJsonObj.getString( "uuid" ) );
} catch ( IllegalArgumentException ex ) {
fail( "Invalid UUID format at record number: " + recordNumber );
}
try {
validationDF.parse( generatedUuidJsonObj.getString( "datetime_generated" ) );
} catch ( final NullPointerException | ParseException ex ) {
fail( "datetime_generated field must not be null and must be of format yyyy-MM-dd HH:mm:ss" );
}
}
}} catch ( final Exception ex ) {
ex.printStackTrace();
fail( ex.getMessage() );}}}复制代码And we are done. Thanks for reading and hope this helps.
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

主题

0

回帖

4882万

积分

论坛元老

Rank: 8Rank: 8

积分
48824836
热门排行