SocketIOSocketImpl.java 12.1 KB
/*
 * Decompiled with CFR 0_118.
 * 
 * Could not load the following classes:
 *  javax.annotation.Nonnull
 *  javax.annotation.Nullable
 *  org.apache.sling.commons.json.JSONArray
 *  org.slf4j.Logger
 *  org.slf4j.LoggerFactory
 */
package com.adobe.granite.socketio.impl;

import com.adobe.granite.socketio.SocketIOAck;
import com.adobe.granite.socketio.SocketIOAckListener;
import com.adobe.granite.socketio.SocketIOEmitter;
import com.adobe.granite.socketio.SocketIONamespace;
import com.adobe.granite.socketio.SocketIOSocket;
import com.adobe.granite.socketio.SocketIOSocketListener;
import com.adobe.granite.socketio.impl.Client;
import com.adobe.granite.socketio.impl.Packet;
import com.adobe.granite.socketio.impl.PacketType;
import com.adobe.granite.socketio.impl.SocketIONamespaceImpl;
import com.adobe.granite.socketio.impl.SocketIOServiceImpl;
import java.io.IOException;
import java.security.Principal;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.sling.commons.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketIOSocketImpl
implements SocketIOSocket {
    private static final SocketIOSocketListener NOP_LISTENER = new SocketIOSocketListener(){

        @Override
        public void onEvent(@Nonnull String name, @Nonnull JSONArray data, @Nullable SocketIOAck ack) {
        }

        @Override
        public void onDisconnect(String reason) {
        }

        @Override
        public void onError(JSONArray data) {
        }
    };
    private static final Logger log = LoggerFactory.getLogger(SocketIOSocketImpl.class);
    private final SocketIOEmitter emitter;
    private final Client client;
    private final String sig;
    private final SocketIONamespaceImpl namespace;
    private final Set<String> rooms = new HashSet<String>();
    private final ConcurrentMap<Long, AckRequest> ackRequests = new ConcurrentHashMap<Long, AckRequest>();
    private SocketIOSocketListener listener = NOP_LISTENER;

    public SocketIOSocketImpl(Client client, SocketIONamespaceImpl namespace) {
        this.client = client;
        this.namespace = namespace;
        this.emitter = namespace.createEmitter(client.getId(), true);
        this.sig = client.getId() + "@" + namespace.getName();
    }

    public void onConnect() {
        log.debug("[{}] socket connected - writing packet", (Object)this.sig);
        this.join(this.client.getId());
        try {
            this.client.send(new Packet(PacketType.CONNECT, this.namespace.getName(), -1, null, 0));
        }
        catch (IOException e) {
            log.error("[{}] error sending packet", (Object)this.sig, (Object)e);
        }
        this.namespace.bind(this);
        this.client.getSocketIOService().onConnect(this);
    }

    private void onError(Packet p) {
        try {
            this.listener.onError(p.data);
        }
        catch (Exception e) {
            log.error("listener threw exception", (Throwable)e);
        }
    }

    private void onAck(Packet p) {
        AckRequest ackRequest = this.ackRequests.remove(p.id);
        if (ackRequest == null) {
            log.warn("[{}] Received illegal ack response with id={}.", (Object)this.sig, (Object)p.id);
        } else {
            ackRequest.onAck(p.data);
        }
    }

    private void onEvent(Packet p) {
        if (p.data == null || p.data.length() == 0) {
            log.error("[{}] Invalid payload. data array missing or empty for type: {}", (Object)this.sig, (Object)p.type);
            return;
        }
        JSONArray args = new JSONArray();
        String name = p.data.optString(0);
        for (int i = 1; i < p.data.length(); ++i) {
            args.put(p.data.opt(i));
        }
        AckImpl ack = p.id >= 0 ? new AckImpl(p.id) : null;
        try {
            this.listener.onEvent(name, args, ack);
        }
        catch (Exception e) {
            log.error("listener threw exception", (Throwable)e);
        }
        if (ack != null && !ack.sent) {
            log.warn("[{}] client requested ack response for event '{}' which was not handled.", (Object)name, (Object)this.sig);
        }
    }

    private void onDisconnect() {
        log.debug("[{}] got disconnect packet", (Object)this.sig);
        this.onClose("client namespace disconnect");
    }

    private void onClose(String reason) {
        log.debug("[{}] closing socket. reason: {}", (Object)this.sig, (Object)reason);
        this.leaveAll();
        this.namespace.unbind(this);
        this.client.detach(this);
        try {
            this.listener.onDisconnect(reason);
        }
        catch (Exception e) {
            log.error("listener threw exception", (Throwable)e);
        }
        for (AckRequest ack : this.ackRequests.values()) {
            ack.close();
        }
        this.ackRequests.clear();
        this.client.getSocketIOService().onDisconnect(this, reason);
    }

    public void onPacket(Packet p) {
        log.trace("[{}] incoming packet {}: {}", new Object[]{this.sig, p.type, p.data});
        switch (p.type) {
            case DISCONNECT: {
                this.onDisconnect();
                break;
            }
            case EVENT: {
                this.onEvent(p);
                break;
            }
            case ACK: {
                this.onAck(p);
                break;
            }
            case ERROR: {
                this.onError(p);
                break;
            }
            case BINARY_EVENT: {
                break;
            }
            case BINARY_ACK: {
                break;
            }
            default: {
                log.error("[{}] Unexpected packet type: {}", (Object)this.sig, (Object)p.type);
            }
        }
    }

    public void send(Packet p) throws IOException {
        this.client.send(p);
    }

    @Override
    public SocketIOSocket setListener(SocketIOSocketListener listener) {
        if (listener == null) {
            throw new NullPointerException();
        }
        if (this.listener != NOP_LISTENER) {
            throw new IllegalArgumentException("Overriding existing listener not allowed.");
        }
        this.listener = listener;
        return this;
    }

    protected boolean hasListener() {
        return this.listener != NOP_LISTENER;
    }

    @Override
    public SocketIOSocket removeListener(SocketIOSocketListener listener) {
        if (this.listener != listener && this.listener != NOP_LISTENER) {
            throw new IllegalArgumentException("Unable to remove foreign listener " + listener);
        }
        this.listener = NOP_LISTENER;
        return this;
    }

    @Override
    public SocketIOSocket disconnect(boolean close) {
        if (close) {
            this.client.disconnect();
        } else {
            try {
                this.client.send(new Packet(PacketType.DISCONNECT, this.namespace.getName(), -1, null, 0));
            }
            catch (IOException e) {
                log.error("[{}] error sending packet", (Object)this.sig, (Object)e);
            }
            this.onClose("server namespace disconnect");
        }
        return this;
    }

    @Override
    public String getId() {
        return this.client.getId();
    }

    @Override
    public SocketIOEmitter broadcast() {
        return this.namespace.createEmitter(this.getId(), false);
    }

    @Override
    public SocketIOSocket leave(String name) {
        this.rooms.remove(name);
        this.namespace.leave(this, name);
        return this;
    }

    private SocketIOSocket leaveAll() {
        for (String room : this.rooms) {
            this.namespace.leave(this, room);
        }
        this.rooms.clear();
        return this;
    }

    @Override
    public SocketIOSocket join(String name) {
        this.rooms.add(name);
        this.namespace.join(this, name);
        return this;
    }

    @Override
    public Set<String> getRooms() {
        return Collections.unmodifiableSet(this.rooms);
    }

    @Override
    public SocketIONamespace getNamespace() {
        return this.namespace;
    }

    @Nonnull
    @Override
    public /* varargs */ SocketIOSocket emit(@Nonnull String eventName, @Nonnull SocketIOAckListener ackListener, @Nonnull Object ... arguments) throws IOException {
        JSONArray data = new JSONArray();
        data.put((Object)eventName);
        for (Object arg : arguments) {
            data.put(arg);
        }
        return this.emit(eventName, ackListener, data);
    }

    @Nonnull
    @Override
    public SocketIOSocket emit(@Nonnull String name, @Nonnull SocketIOAckListener ack, @Nonnull JSONArray data) throws IOException {
        AckRequest ackRequest;
        while (this.ackRequests.putIfAbsent((ackRequest = new AckRequest(this.ackRequests.size(), ack)).ackId, ackRequest) != null) {
        }
        log.debug("[{}] Scheduling callback timeout for ack id: {}", (Object)ackRequest.ackId, (Object)this.sig);
        ackRequest.timeoutHandle = this.client.getSocketIOService().scheduleCallbackTimeout(ackRequest, ack.getCallbackTimeout());
        this.send(new Packet(PacketType.EVENT, this.getNamespace().getName(), ackRequest.ackId, data, 0));
        return this;
    }

    @Override
    public /* varargs */ SocketIOEmitter emit(String eventName, Object ... arguments) throws IOException {
        return this.emitter.emit(eventName, arguments);
    }

    @Override
    public SocketIOEmitter emit(String eventName, JSONArray arguments) throws IOException {
        return this.emitter.emit(eventName, arguments);
    }

    @Override
    public /* varargs */ SocketIOEmitter to(String ... room) {
        return this.emitter.to(room);
    }

    @Override
    public Principal getUserPrincipal() {
        return this.client.getUserPrincipal();
    }

    private class AckRequest
    implements Runnable {
        private final long ackId;
        private final SocketIOAckListener listener;
        private ScheduledFuture<?> timeoutHandle;

        private AckRequest(long ackId, SocketIOAckListener listener) {
            this.ackId = ackId;
            this.listener = listener;
        }

        private void onAck(@Nonnull JSONArray data) {
            log.debug("[{}] Ack event received for ack id: {}", (Object)SocketIOSocketImpl.this.sig, (Object)this.ackId);
            this.timeoutHandle.cancel(false);
            try {
                this.listener.onAck(data);
            }
            catch (Exception e) {
                log.error("listener threw exception", (Throwable)e);
            }
        }

        private void close() {
            this.timeoutHandle.cancel(true);
        }

        @Override
        public void run() {
            log.debug("[{}] Callback timeout for ack id: {}", (Object)SocketIOSocketImpl.this.sig, (Object)this.ackId);
            SocketIOSocketImpl.this.ackRequests.remove(this.ackId);
            try {
                this.listener.onTimeout();
            }
            catch (Exception e) {
                log.error("listener threw exception", (Throwable)e);
            }
        }
    }

    private class AckImpl
    implements SocketIOAck {
        private final long ackId;
        private boolean sent;

        private AckImpl(long ackId) {
            this.ackId = ackId;
        }

        @Override
        public boolean isAcknowledged() {
            return this.sent;
        }

        @Override
        public /* varargs */ void send(@Nonnull Object ... arguments) throws IOException {
            JSONArray data = new JSONArray();
            for (Object arg : arguments) {
                data.put(arg);
            }
            this.send(data);
        }

        @Override
        public void send(@Nonnull JSONArray data) throws IOException {
            if (this.sent) {
                throw new IllegalStateException("Acknowledge packet already sent");
            }
            Packet p = new Packet(PacketType.ACK, SocketIOSocketImpl.this.namespace.getName(), this.ackId, data, 0);
            SocketIOSocketImpl.this.client.send(p);
            this.sent = true;
        }
    }

}