-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
Neo4jItemReader.java
227 lines (197 loc) · 6.91 KB
/
Neo4jItemReader.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
/*
* Copyright 2012-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.item.data;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.neo4j.ogm.session.Session;
import org.neo4j.ogm.session.SessionFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* <p>
* Restartable {@link ItemReader} that reads objects from the graph database Neo4j
* via a paging technique.
* </p>
*
* <p>
* It executes cypher queries built from the statement fragments provided to
* retrieve the requested data. The query is executed using paged requests of
* a size specified in {@link #setPageSize(int)}. Additional pages are requested
* as needed when the {@link #read()} method is called. On restart, the reader
* will begin again at the same number item it left off at.
* </p>
*
* <p>
* Performance is dependent on your Neo4J configuration (embedded or remote) as
* well as page size. Setting a fairly large page size and using a commit
* interval that matches the page size should provide better performance.
* </p>
*
* <p>
* This implementation is thread-safe between calls to
* {@link #open(org.springframework.batch.item.ExecutionContext)}, however you
* should set <code>saveState=false</code> if used in a multi-threaded
* environment (no restart available).
* </p>
*
* @author Michael Minella
* @author Mahmoud Ben Hassine
*
* @deprecated since 5.0 in favor of the item reader from
* https://github.com/spring-projects/spring-batch-extensions/blob/main/spring-batch-neo4j
*/
@Deprecated
public class Neo4jItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {
protected Log logger = LogFactory.getLog(getClass());
private SessionFactory sessionFactory;
private String startStatement;
private String returnStatement;
private String matchStatement;
private String whereStatement;
private String orderByStatement;
private Class<T> targetType;
private Map<String, Object> parameterValues;
/**
* Optional parameters to be used in the cypher query.
*
* @param parameterValues the parameter values to be used in the cypher query
*/
public void setParameterValues(Map<String, Object> parameterValues) {
this.parameterValues = parameterValues;
}
protected final Map<String, Object> getParameterValues() {
return this.parameterValues;
}
/**
* The start segment of the cypher query. START is prepended
* to the statement provided and should <em>not</em> be
* included.
*
* @param startStatement the start fragment of the cypher query.
*/
public void setStartStatement(String startStatement) {
this.startStatement = startStatement;
}
/**
* The return statement of the cypher query. RETURN is prepended
* to the statement provided and should <em>not</em> be
* included
*
* @param returnStatement the return fragment of the cypher query.
*/
public void setReturnStatement(String returnStatement) {
this.returnStatement = returnStatement;
}
/**
* An optional match fragment of the cypher query. MATCH is
* prepended to the statement provided and should <em>not</em>
* be included.
*
* @param matchStatement the match fragment of the cypher query
*/
public void setMatchStatement(String matchStatement) {
this.matchStatement = matchStatement;
}
/**
* An optional where fragment of the cypher query. WHERE is
* prepended to the statement provided and should <em>not</em>
* be included.
*
* @param whereStatement where fragment of the cypher query
*/
public void setWhereStatement(String whereStatement) {
this.whereStatement = whereStatement;
}
/**
* A list of properties to order the results by. This is
* required so that subsequent page requests pull back the
* segment of results correctly. ORDER BY is prepended to
* the statement provided and should <em>not</em> be included.
*
* @param orderByStatement order by fragment of the cypher query.
*/
public void setOrderByStatement(String orderByStatement) {
this.orderByStatement = orderByStatement;
}
protected SessionFactory getSessionFactory() {
return sessionFactory;
}
/**
* Establish the session factory for the reader.
* @param sessionFactory the factory to use for the reader.
*/
public void setSessionFactory(SessionFactory sessionFactory) {
this.sessionFactory = sessionFactory;
}
/**
* The object type to be returned from each call to {@link #read()}
*
* @param targetType the type of object to return.
*/
public void setTargetType(Class<T> targetType) {
this.targetType = targetType;
}
protected final Class<T> getTargetType() {
return this.targetType;
}
protected String generateLimitCypherQuery() {
StringBuilder query = new StringBuilder(128);
query.append("START ").append(startStatement);
query.append(matchStatement != null ? " MATCH " + matchStatement : "");
query.append(whereStatement != null ? " WHERE " + whereStatement : "");
query.append(" RETURN ").append(returnStatement);
query.append(" ORDER BY ").append(orderByStatement);
query.append(" SKIP " + (pageSize * page));
query.append(" LIMIT " + pageSize);
String resultingQuery = query.toString();
if (logger.isDebugEnabled()) {
logger.debug(resultingQuery);
}
return resultingQuery;
}
/**
* Checks mandatory properties
*
* @see InitializingBean#afterPropertiesSet()
*/
@Override
public void afterPropertiesSet() throws Exception {
Assert.state(sessionFactory != null,"A SessionFactory is required");
Assert.state(targetType != null, "The type to be returned is required");
Assert.state(StringUtils.hasText(startStatement), "A START statement is required");
Assert.state(StringUtils.hasText(returnStatement), "A RETURN statement is required");
Assert.state(StringUtils.hasText(orderByStatement), "A ORDER BY statement is required");
}
@SuppressWarnings("unchecked")
@Override
protected Iterator<T> doPageRead() {
Session session = getSessionFactory().openSession();
Iterable<T> queryResults = session.query(getTargetType(),
generateLimitCypherQuery(),
getParameterValues());
if(queryResults != null) {
return queryResults.iterator();
}
else {
return new ArrayList<T>().iterator();
}
}
}