S7DeliveryTransportHandler.java 9.87 KB
/*
 * Decompiled with CFR 0_118.
 * 
 * Could not load the following classes:
 *  com.day.cq.replication.AgentConfig
 *  com.day.cq.replication.ReplicationAction
 *  com.day.cq.replication.ReplicationActionType
 *  com.day.cq.replication.ReplicationContent
 *  com.day.cq.replication.ReplicationException
 *  com.day.cq.replication.ReplicationLog
 *  com.day.cq.replication.ReplicationResult
 *  com.day.cq.replication.ReplicationTransaction
 *  com.day.cq.replication.TransportContext
 *  com.day.cq.replication.TransportContext$Discardable
 *  com.day.cq.replication.TransportHandler
 *  com.scene7.is.catalog.service.publish.atomic.BatchStreamingPublishingService
 *  com.scene7.is.catalog.service.publish.atomic.PublishingService
 *  com.scene7.is.util.callbacks.Option
 *  com.scene7.is.util.collections.CollectionUtil
 *  org.apache.commons.io.IOUtils
 *  org.apache.felix.scr.annotations.Component
 *  org.apache.felix.scr.annotations.Reference
 *  org.apache.felix.scr.annotations.Service
 *  org.slf4j.Logger
 *  org.slf4j.LoggerFactory
 */
package com.adobe.cq.dam.aod.replication;

import com.adobe.cq.dam.aod.replication.Util;
import com.adobe.cq.dam.aod.replication.transport_config.ReplicationAgentProvider;
import com.day.cq.replication.AgentConfig;
import com.day.cq.replication.ReplicationAction;
import com.day.cq.replication.ReplicationActionType;
import com.day.cq.replication.ReplicationContent;
import com.day.cq.replication.ReplicationException;
import com.day.cq.replication.ReplicationLog;
import com.day.cq.replication.ReplicationResult;
import com.day.cq.replication.ReplicationTransaction;
import com.day.cq.replication.TransportContext;
import com.day.cq.replication.TransportHandler;
import com.scene7.is.catalog.service.publish.atomic.BatchStreamingPublishingService;
import com.scene7.is.catalog.service.publish.atomic.PublishingService;
import com.scene7.is.util.callbacks.Option;
import com.scene7.is.util.collections.CollectionUtil;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import org.apache.commons.io.IOUtils;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component
@Service(value={TransportHandler.class})
public final class S7DeliveryTransportHandler
implements TransportHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(S7DeliveryTransportHandler.class);
    private static final Logger IGNORED_PATHS_LOGGER = LoggerFactory.getLogger((String)(S7DeliveryTransportHandler.class.getName() + "-ignored-paths"));
    @Reference
    private final ReplicationAgentProvider agentProvider;

    public static TransportHandler s7DeliveryTransportHandler(ReplicationAgentProvider agentProvider) {
        return new S7DeliveryTransportHandler(agentProvider);
    }

    public boolean canHandle(AgentConfig agentConfig) {
        return agentConfig.getTransportURI().startsWith("s7delivery");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ReplicationResult deliver(TransportContext ctx, ReplicationTransaction tx) throws ReplicationException {
        long startTime = System.currentTimeMillis();
        S7DeliveryTransportHandler.info(tx, "Transferring content");
        Iterator i$ = S7DeliveryTransportHandler.getDataInput(tx).iterator();
        if (i$.hasNext()) {
            DataInputStream in = (DataInputStream)i$.next();
            try {
                PublishingService delegate = this.getPublishingService(ctx, tx.getLog());
                in = new DataInputStream(in);
                BatchStreamingPublishingService.receive((DataInput)in, (PublishingService)delegate);
                ReplicationResult replicationResult = S7DeliveryTransportHandler.replicationSuccess(tx, startTime);
                return replicationResult;
            }
            catch (Throwable e) {
                if (tx.getAction().getType().equals((Object)ReplicationActionType.TEST)) {
                    ReplicationResult replicationResult = S7DeliveryTransportHandler.replicationError(tx, e);
                    return replicationResult;
                }
                if (this.isPublishReceiverAvailable(ctx, tx.getLog())) {
                    LOGGER.error(e.getMessage(), e);
                    S7DeliveryTransportHandler.ignored(tx, "Failed to handle replication content (ignored): " + e.getMessage());
                    ReplicationResult replicationResult = S7DeliveryTransportHandler.replicationSuccess(tx, startTime);
                    return replicationResult;
                }
                ReplicationResult replicationResult = S7DeliveryTransportHandler.replicationError(tx, e);
                return replicationResult;
            }
            finally {
                IOUtils.closeQuietly((InputStream)in);
            }
        }
        return S7DeliveryTransportHandler.replicationSuccess(tx, startTime);
    }

    private S7DeliveryTransportHandler(ReplicationAgentProvider agentProvider) {
        this.agentProvider = agentProvider;
    }

    public S7DeliveryTransportHandler() {
        this.agentProvider = (ReplicationAgentProvider)Util.initRef();
    }

    private boolean isPublishReceiverAvailable(TransportContext ctx, ReplicationLog log) {
        try {
            PublishingService service = this.getPublishingService(ctx, log);
            service.test("");
            return true;
        }
        catch (ReplicationException e) {
            LOGGER.error("Error getting publish receiver instance: " + e.getMessage(), (Throwable)e);
            return false;
        }
    }

    private PublishingService getPublishingService(TransportContext ctx, ReplicationLog log) throws ReplicationException {
        State state;
        TransportContext.Discardable stateCandidate = ctx.getAttribute(this.getClass().getName());
        if (stateCandidate == null || !(stateCandidate instanceof State)) {
            state = new State(this.agentProvider.getAgent(ctx.getConfig(), log));
            ctx.setAttribute(this.getClass().getName(), (TransportContext.Discardable)state);
        } else {
            state = (State)stateCandidate;
        }
        return state.agent;
    }

    private static Option<DataInputStream> getDataInput(ReplicationTransaction tx) {
        ReplicationContent content = tx.getContent();
        if (content == null) {
            S7DeliveryTransportHandler.ignored(tx, "Replication content is null (ignored)");
            return Option.none();
        }
        if (content.equals((Object)ReplicationContent.VOID)) {
            return Option.none();
        }
        try {
            InputStream in = content.getInputStream();
            if (in == null) {
                S7DeliveryTransportHandler.ignored(tx, "Content input stream is null");
                return Option.none();
            }
            return Option.some((Object)new DataInputStream(in));
        }
        catch (IOException e) {
            LOGGER.error(e.getMessage(), (Throwable)e);
            S7DeliveryTransportHandler.ignored(tx, "Error getting replication content stream (ignored): " + e.getMessage());
            return Option.none();
        }
    }

    private static void info(ReplicationTransaction tx, String message) {
        String msg = message + " for " + (Object)tx.getAction();
        tx.getLog().info(msg);
        LOGGER.info(msg);
    }

    private static void ignored(ReplicationTransaction tx, String message) {
        String msg = message + " for " + (Object)tx.getAction();
        tx.getLog().info(msg);
        LOGGER.error(msg);
        IGNORED_PATHS_LOGGER.error(S7DeliveryTransportHandler.toString(tx));
    }

    private static String toString(ReplicationTransaction tx) {
        ReplicationContent content = tx.getContent();
        String type = ReplicationContent.VOID.equals((Object)content) ? "<VOID>" : content.getContentType();
        ReplicationAction action = tx.getAction();
        if (action != null) {
            String paths = CollectionUtil.mkString((Object[])action.getPaths(), (String)"[", (String)", ", (String)"]");
            return type + ':' + (Object)action.getType() + ':' + paths;
        }
        return type;
    }

    private static ReplicationResult replicationSuccess(ReplicationTransaction tx, long startTime) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        String message = String.format("Transfer succeeded in %s ms for %s", new Object[]{elapsedTime, tx.getAction()});
        tx.getLog().info(message);
        LOGGER.info(message);
        return ReplicationResult.OK;
    }

    private static ReplicationResult replicationError(ReplicationTransaction tx, Throwable e) throws ReplicationException {
        String message = String.format("Transfer failed for %s. %s: %s", new Object[]{tx.getAction(), e.getClass().getName(), e.getMessage()});
        tx.getLog().error(message);
        LOGGER.error(message, e);
        if (e instanceof Error) {
            throw (Error)e;
        }
        if (e instanceof RuntimeException) {
            throw (RuntimeException)e;
        }
        if (e instanceof Exception) {
            throw new ReplicationException(message, (Exception)e);
        }
        throw new AssertionError(e);
    }

    protected void bindAgentProvider(ReplicationAgentProvider replicationAgentProvider) {
        this.agentProvider = replicationAgentProvider;
    }

    protected void unbindAgentProvider(ReplicationAgentProvider replicationAgentProvider) {
        if (this.agentProvider == replicationAgentProvider) {
            this.agentProvider = null;
        }
    }

    private static class State
    implements TransportContext.Discardable {
        private final PublishingService agent;

        State(PublishingService agent) {
            this.agent = agent;
        }

        public void discard() {
        }
    }

}