ChainReplicationService.java 8.43 KB
/*
 * Decompiled with CFR 0_118.
 * 
 * Could not load the following classes:
 *  javax.jcr.RepositoryException
 *  javax.jcr.Session
 *  org.apache.felix.scr.annotations.Activate
 *  org.apache.felix.scr.annotations.Component
 *  org.apache.felix.scr.annotations.Deactivate
 *  org.apache.felix.scr.annotations.Properties
 *  org.apache.felix.scr.annotations.Property
 *  org.apache.felix.scr.annotations.Reference
 *  org.apache.felix.scr.annotations.Service
 *  org.apache.sling.jcr.api.SlingRepository
 *  org.osgi.service.event.Event
 *  org.osgi.service.event.EventHandler
 *  org.slf4j.Logger
 *  org.slf4j.LoggerFactory
 */
package com.day.cq.replication.impl;

import com.day.cq.replication.Agent;
import com.day.cq.replication.AgentConfig;
import com.day.cq.replication.AgentFilter;
import com.day.cq.replication.AgentNotFoundException;
import com.day.cq.replication.ReplicationAction;
import com.day.cq.replication.ReplicationActionType;
import com.day.cq.replication.ReplicationEvent;
import com.day.cq.replication.ReplicationException;
import com.day.cq.replication.ReplicationOptions;
import com.day.cq.replication.Replicator;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.jcr.api.SlingRepository;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=1)
@Properties(value={@Property(name="event.topics", value={"com/adobe/granite/replication"}), @Property(name="event.filter", value={"(!(event.application=*))"})})
@Service(value={EventHandler.class})
public class ChainReplicationService
implements EventHandler,
Runnable {
    private static final AgentFilter RECEIVE_FILTER = new AgentFilter(){

        @Override
        public boolean isIncluded(Agent agent) {
            return agent.getConfiguration().isTriggeredOnReceive();
        }
    };
    private static final long MAX_AGE = 2000;
    private final Logger log = LoggerFactory.getLogger(ChainReplicationService.class);
    @Reference
    private Replicator replicator;
    @Reference
    private SlingRepository repository;
    private final List<ReplicationAction> modifications = new LinkedList<ReplicationAction>();
    private final Lock lock = new ReentrantLock();
    private final Condition guard = this.lock.newCondition();
    private long firstModified;
    private volatile boolean active;

    @Activate
    protected void activate() {
        this.active = true;
        Thread t = new Thread((Runnable)this, "Adobe Granite ChainReplicationService Processor");
        t.setDaemon(true);
        t.start();
    }

    @Deactivate
    protected void deactivate() {
        this.active = false;
        this.lock.lock();
        try {
            this.modifications.clear();
            this.guard.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        while (this.active) {
            try {
                this.lock.lock();
                long age = System.currentTimeMillis() - this.firstModified;
                if (this.modifications.isEmpty()) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Queue is empty - waiting for modifications.");
                    }
                    try {
                        this.guard.await();
                    }
                    catch (InterruptedException e) {
                        this.log.warn("Condition interrupted.", (Throwable)e);
                    }
                    continue;
                }
                if (age < 2000) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Queue is too young. waiting another {}ms.", (Object)(2000 - age));
                    }
                    try {
                        this.guard.await(2000 - age, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException e) {
                        this.log.warn("Condition interrupted.", (Throwable)e);
                    }
                    continue;
                }
                LinkedList<ReplicationAction> actions = new LinkedList<ReplicationAction>(this.modifications);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Processing {} queue entries", (Object)actions.size());
                }
                this.modifications.clear();
                this.lock.unlock();
                if (this.active && actions.size() > 0) {
                    Session session = null;
                    try {
                        session = this.repository.loginService("replicationService", null);
                        this.chainReplicate(session, actions);
                    }
                    catch (RepositoryException e) {
                        this.log.error("could not log in the repository", (Throwable)e);
                    }
                    finally {
                        if (session != null) {
                            session.logout();
                        }
                    }
                }
                this.lock.lock();
                continue;
            }
            finally {
                this.lock.unlock();
                continue;
            }
        }
    }

    private void chainReplicate(Session session, List<ReplicationAction> actions) {
        for (ReplicationAction replicationAction : actions) {
            try {
                ReplicationOptions opts = new ReplicationOptions();
                opts.setFilter(RECEIVE_FILTER);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Chain-Replicating {} of {}", (Object)replicationAction.getType(), (Object)replicationAction.getPath());
                }
                this.replicator.replicate(session, replicationAction.getType(), replicationAction.getPath(), opts);
                if (!this.log.isInfoEnabled()) continue;
                this.log.info("Chain-Replicated {} of {}", (Object)replicationAction.getType(), (Object)replicationAction.getPath());
            }
            catch (AgentNotFoundException e) {
                this.log.debug("No agent selected.");
            }
            catch (ReplicationException e) {
                this.log.warn("Error during replication.", (Throwable)e);
            }
            catch (Throwable e) {
                this.log.error("Unexpected error happened", e);
            }
        }
    }

    public void handleEvent(Event event) {
        ReplicationEvent replicationEvent = ReplicationEvent.fromEvent(event);
        if (replicationEvent == null) {
            return;
        }
        ReplicationAction replicationAction = replicationEvent.getReplicationAction();
        if (replicationAction.getType().equals((Object)ReplicationActionType.ACTIVATE) || replicationAction.getType().equals((Object)ReplicationActionType.DELETE) || replicationAction.getType().equals((Object)ReplicationActionType.DEACTIVATE)) {
            this.lock.lock();
            if (this.modifications.isEmpty()) {
                this.firstModified = System.currentTimeMillis();
            }
            this.modifications.add(replicationAction);
            this.guard.signal();
            this.lock.unlock();
        }
    }

    protected void bindReplicator(Replicator replicator) {
        this.replicator = replicator;
    }

    protected void unbindReplicator(Replicator replicator) {
        if (this.replicator == replicator) {
            this.replicator = null;
        }
    }

    protected void bindRepository(SlingRepository slingRepository) {
        this.repository = slingRepository;
    }

    protected void unbindRepository(SlingRepository slingRepository) {
        if (this.repository == slingRepository) {
            this.repository = null;
        }
    }

}