JcrClusterSupport.java
10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
/*
* Decompiled with CFR 0_118.
*
* Could not load the following classes:
* javax.jcr.Repository
* org.apache.felix.scr.annotations.Activate
* org.apache.felix.scr.annotations.Component
* org.apache.felix.scr.annotations.ConfigurationPolicy
* org.apache.felix.scr.annotations.Deactivate
* org.apache.felix.scr.annotations.Reference
* org.apache.felix.scr.annotations.ReferenceCardinality
* org.apache.felix.scr.annotations.ReferencePolicy
* org.apache.sling.discovery.InstanceDescription
* org.apache.sling.discovery.TopologyEvent
* org.apache.sling.discovery.TopologyEvent$Type
* org.apache.sling.discovery.TopologyEventListener
* org.apache.sling.discovery.TopologyView
* org.slf4j.Logger
* org.slf4j.LoggerFactory
*/
package com.day.cq.jcrclustersupport.impl;
import com.day.cq.jcrclustersupport.ClusterAware;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jcr.Repository;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.ConfigurationPolicy;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyEventListener;
import org.apache.sling.discovery.TopologyView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component(immediate=1, metatype=0, policy=ConfigurationPolicy.IGNORE)
public class JcrClusterSupport
implements Runnable,
TopologyEventListener {
private static final String REPO_DESC_ID = "crx.repository.systemid";
private static final String REPO_DESC_CLUSTER_ID = "crx.cluster.id";
private static final String REPO_DESC_IS_MASTER = "crx.cluster.master";
private static final String LEGACY_REPO_DESC_MASTER_ID = "com.day.crx.cluster.masterid";
private static final String LEGACY_REPO_DESC_NODE_ID = "com.day.crx.cluster.nodeid";
private final Logger log;
@Reference(cardinality=ReferenceCardinality.OPTIONAL_UNARY, policy=ReferencePolicy.DYNAMIC)
private Repository repository;
@Reference(cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE, policy=ReferencePolicy.DYNAMIC)
private ClusterAware[] clusterAwares;
private boolean isMaster;
private final AtomicBoolean isLeader;
private String instanceId;
private Thread masterCheckerThread;
private final Object lock;
public JcrClusterSupport() {
this.log = LoggerFactory.getLogger(this.getClass());
this.clusterAwares = new ClusterAware[0];
this.isLeader = new AtomicBoolean(true);
this.lock = new Object();
}
public void run() {
while (this.masterCheckerThread != null) {
this.log.debug("run: Checking for master state change");
this.updateMaster();
try {
this.log.debug("run: Sleeping until next check");
Thread.sleep(15000);
}
catch (InterruptedException ie) {}
}
}
@Activate
protected void activate() {
this.masterCheckerThread = new Thread((Runnable)this, "Cluster Master Observer");
this.masterCheckerThread.setDaemon(true);
this.masterCheckerThread.start();
this.log.info("activate: Cluster Master Observer thread started");
}
@Deactivate
protected void deactivate() {
Thread observerThread = this.masterCheckerThread;
this.masterCheckerThread = null;
if (observerThread != null) {
observerThread.interrupt();
if (observerThread.isAlive()) {
try {
observerThread.join(2000);
}
catch (InterruptedException e) {
// empty catch block
}
if (observerThread.isAlive()) {
this.log.error("deactivate: Cluster Master Observer thread did not stop in time ...");
}
}
}
this.log.info("activate: Cluster Master Observer thread stopped");
}
/*
* WARNING - Removed try catching itself - possible behaviour change.
*/
private void bindRepository(Repository repository) {
ClusterAware[] clusterAwares;
this.log.debug("bindRepository: Binding repository {}", (Object)repository);
Object object = this.lock;
synchronized (object) {
this.repository = repository;
this.isMaster = this.isMaster(repository);
clusterAwares = this.clusterAwares;
}
this.firstAccess(repository, clusterAwares);
}
/*
* WARNING - Removed try catching itself - possible behaviour change.
*/
private void unbindRepository(Repository repository) {
ClusterAware[] clusterAwares;
this.log.debug("unbindRepository: Releasing repository {}", (Object)repository);
Object object = this.lock;
synchronized (object) {
if (this.repository == repository) {
this.repository = null;
clusterAwares = this.clusterAwares;
} else {
clusterAwares = null;
}
}
if (clusterAwares != null) {
for (ClusterAware clusterAware : clusterAwares) {
clusterAware.unbindRepository();
}
}
}
/*
* WARNING - Removed try catching itself - possible behaviour change.
*/
private void bindClusterAware(ClusterAware clusterAware) {
Repository repository;
this.log.debug("bindClusterAware: Binding ClusterAware {}", (Object)clusterAware);
Object object = this.lock;
synchronized (object) {
ArrayList<ClusterAware> currentList = new ArrayList<ClusterAware>(Arrays.asList(this.clusterAwares));
currentList.add(clusterAware);
this.clusterAwares = currentList.toArray(new ClusterAware[currentList.size()]);
repository = this.repository;
}
if (repository != null) {
this.firstAccess(repository, new ClusterAware[]{clusterAware});
} else {
clusterAware.unbindRepository();
}
}
/*
* WARNING - Removed try catching itself - possible behaviour change.
*/
private void unbindClusterAware(ClusterAware clusterAware) {
this.log.debug("unbindClusterAware: Releasing ClusterAware {}", (Object)clusterAware);
Object object = this.lock;
synchronized (object) {
ArrayList<ClusterAware> currentList = new ArrayList<ClusterAware>(Arrays.asList(this.clusterAwares));
currentList.remove(clusterAware);
this.clusterAwares = currentList.toArray(new ClusterAware[currentList.size()]);
}
}
private void firstAccess(Repository repository, ClusterAware[] clusterAwares) {
boolean isMaster = this.isMaster(repository);
String repositoryId = this.getRepoId(repository);
String clusterId = JcrClusterSupport.getClusterId(repository);
this.log.debug("firstAccess: Repository id={}, cluster={}, isMaster={}", new Object[]{repositoryId, clusterId, isMaster});
for (ClusterAware clusterAware : clusterAwares) {
clusterAware.bindRepository(repositoryId, clusterId, isMaster);
}
}
/*
* WARNING - Removed try catching itself - possible behaviour change.
*/
private void updateMaster() {
ClusterAware[] clusterAwares;
boolean isMaster;
Repository repository;
Object object = this.lock;
synchronized (object) {
repository = this.repository;
clusterAwares = this.clusterAwares;
isMaster = this.isMaster;
}
if (repository != null) {
boolean actualIsMaster = this.isMaster(repository);
if (actualIsMaster != isMaster) {
this.log.info("updateMaster: Switching master state {} => {}", (Object)isMaster, (Object)actualIsMaster);
Object object2 = this.lock;
synchronized (object2) {
this.isMaster = actualIsMaster;
}
String repositoryId = this.getRepoId(repository);
String clusterId = JcrClusterSupport.getClusterId(repository);
for (ClusterAware clusterAware : clusterAwares) {
clusterAware.bindRepository(repositoryId, clusterId, actualIsMaster);
}
} else {
this.log.debug("updateMaster: Master state unchanged: {}", (Object)isMaster);
}
}
}
private static boolean isLegagy(Repository repository) {
return repository.getDescriptor("jcr.specification.version").startsWith("1");
}
private static boolean isOak(Repository repository) {
return repository.getDescriptor("crx.cluster.master") == null;
}
private boolean isMaster(Repository repository) {
if (JcrClusterSupport.isLegagy(repository)) {
String masterId = repository.getDescriptor("com.day.crx.cluster.masterid");
String nodeId = repository.getDescriptor("com.day.crx.cluster.nodeid");
return masterId.equals(nodeId);
}
if (JcrClusterSupport.isOak(repository)) {
return this.isLeader.get();
}
String isMasterDesc = repository.getDescriptor("crx.cluster.master");
return Boolean.parseBoolean(isMasterDesc);
}
private String getRepoId(Repository repository) {
if (JcrClusterSupport.isLegagy(repository)) {
return repository.getDescriptor("com.day.crx.cluster.nodeid");
}
if (repository.getDescriptor("crx.repository.systemid") != null) {
return repository.getDescriptor("crx.repository.systemid");
}
return this.instanceId;
}
private static String getClusterId(Repository repository) {
if (JcrClusterSupport.isLegagy(repository)) {
return "legacy cluster";
}
if (JcrClusterSupport.isOak(repository)) {
return "oak cluster";
}
return repository.getDescriptor("crx.cluster.id");
}
public void handleTopologyEvent(TopologyEvent event) {
if (event.getType() == TopologyEvent.Type.TOPOLOGY_INIT) {
this.instanceId = event.getNewView().getLocalInstance().getSlingId();
}
if (event.getType() == TopologyEvent.Type.TOPOLOGY_CHANGED || event.getType() == TopologyEvent.Type.TOPOLOGY_INIT) {
this.isLeader.set(event.getNewView().getLocalInstance().isLeader());
}
}
}