package com.kedacom.uc.transmit.socket;

import android.annotation.SuppressLint;
import androidx.annotation.NonNull;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.kedacom.basic.common.util.BeanFlattenUtil;
import com.kedacom.basic.common.util.LongUtil;
import com.kedacom.basic.common.util.Optional;
import com.kedacom.basic.common.util.StringUtil;
import com.kedacom.basic.http.retrofit.request.RequestBuilder;
import com.kedacom.basic.log.LogConstant;
import com.kedacom.uc.common.cache.ContextProvider;
import com.kedacom.uc.common.http.protocol.response.TransmitHttpSnapshotResp;
import com.kedacom.uc.common.logic.RxRetryProcess;
import com.kedacom.uc.common.rx.ScheduleTransformer;
import com.kedacom.uc.common.rx.TransmitSnapshotHttpHandleFuc;
import com.kedacom.uc.sdk.auth.model.IAccount;
import com.kedacom.uc.sdk.bean.basic.ResultCode;
import com.kedacom.uc.sdk.bean.pageable.SnapshotResult;
import com.kedacom.uc.sdk.bean.transmit.DefaultSignalMessage;
import com.kedacom.uc.sdk.bean.transmit.response.JsonSignalRespBody;
import com.kedacom.uc.sdk.exception.ResponseException;
import com.kedacom.uc.sdk.generic.constant.MsgAscription;
import com.kedacom.uc.sdk.impl.SdkImpl;
import com.kedacom.uc.sdk.rx.RxHelper;
import com.kedacom.uc.transmit.socket.bean.ClientLatestMessageRecord;
import com.kedacom.uc.transmit.socket.bean.OfflineMessageLocalCache;
import com.kedacom.uc.transmit.socket.bean.PacketLossIntervalRecords;
import com.kedacom.uc.transmit.socket.e.a.bo;
import com.kedacom.uc.transmit.socket.e.a.p;
import com.kedacom.uc.transmit.socket.http.TransparentOfflineMsgReq;
import com.kedacom.uc.transmit.socket.http.b;
import com.kedacom.uc.transmit.socket.j.c;
import com.kedacom.uc.transmit.socket.m.g;
import com.kedacom.uc.transmit.socket.n.a;
import fisec.w0;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.Scheduler;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes5.dex */
public class PacketLossApi {
    private static final Gson gson = new Gson();
    private static PacketLossApi instance;
    private Disposable pullDataDis;
    private final long EXPIRED_TIME = 7776000000L;
    private final long WAIT_SPACE = w0.B;
    private final Object pullHolder = new Object();
    private Logger logger = LoggerFactory.getLogger(LogConstant.DEFAULT_PACKETLOSS_LOGGER);
    private CompositeDisposable compositeDisposable = new CompositeDisposable();
    private Type type = new TypeToken<DefaultSignalMessage<JsonSignalRespBody>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.1
    }.getType();
    ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(5);
    Scheduler scheduler = Schedulers.from(this.threadPoolExecutor);

    private PacketLossApi() {
    }

    public static PacketLossApi getInstance() {
        if (instance == null) {
            instance = new PacketLossApi();
        }
        return instance;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void getLostMsg(boolean z, PacketLossIntervalRecords packetLossIntervalRecords, String str) {
        if (z) {
            c.a().a(packetLossIntervalRecords);
            this.logger.info("{} >> add new lost record to db >>  {}", str, packetLossIntervalRecords);
        }
        if (StringUtil.isEmpty(packetLossIntervalRecords.getMaxSnapshot())) {
            getLostMsgWithLimit(str, packetLossIntervalRecords);
        } else {
            getLostMsgWithMaxSnapshot(str, packetLossIntervalRecords);
        }
    }

    private void getLostMsgWithLimit(final String str, final PacketLossIntervalRecords packetLossIntervalRecords) {
        IAccount orNull = SdkImpl.getInstance().getUserSession().orNull();
        String token = orNull == null ? "" : orNull.getToken();
        final int limit = packetLossIntervalRecords.getLimit();
        packetLossReq(TransparentOfflineMsgReq.build(packetLossIntervalRecords.getRecordAscription(), token, limit, packetLossIntervalRecords.getSnapshot())).retryWhen(new RxRetryProcess(true, 3, 10000L)).doOnNext(new Consumer<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.12
            @Override // io.reactivex.functions.Consumer
            public void accept(Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>> optional) throws Exception {
                if (optional.isPresent()) {
                    PacketLossApi.this.logger.info("getLostMsgWithLimit({})  the resp is {}", str, optional);
                    List<DefaultSignalMessage<JsonSignalRespBody>> data = optional.get().getData();
                    if (data == null || data.size() != limit) {
                        PacketLossApi.this.logger.info("getLostMsgWithLimit({}) : the lost msg is not all pull back ,so the local record [[{}]] is wait for resend", str, packetLossIntervalRecords);
                        return;
                    }
                    c.a().b(packetLossIntervalRecords);
                    PacketLossApi.this.logger.info("getLostMsgWithLimit({}) : the lost msg has all pull back ,so remove local record [[{}]]", str, packetLossIntervalRecords);
                    PacketLossApi.this.processMessage(data, MsgAscription.valueOf(packetLossIntervalRecords.getRecordAscription()));
                }
            }
        }).subscribeOn(this.scheduler).observeOn(this.scheduler).subscribe(new Observer<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.11
            @Override // io.reactivex.Observer
            public void onComplete() {
            }

            @Override // io.reactivex.Observer
            public void onError(Throwable th) {
                PacketLossApi.this.logger.error("getLostMsgWithLimit({})  the resp error = {}", str, th.toString());
            }

            @Override // io.reactivex.Observer
            public void onNext(Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>> optional) {
                PacketLossApi.this.logger.debug("getLostMsgWithLimit on next .");
            }

            @Override // io.reactivex.Observer
            public void onSubscribe(Disposable disposable) {
                PacketLossApi.this.compositeDisposable.add(disposable);
            }
        });
    }

    private void getLostMsgWithMaxSnapshot(final String str, final PacketLossIntervalRecords packetLossIntervalRecords) {
        a.a(ContextProvider.gContext, packetLossIntervalRecords.getMaxSnapshot(), packetLossIntervalRecords.getSnapshot(), MsgAscription.valueOf(packetLossIntervalRecords.getRecordAscription()));
        packetLossReqWithMaxSnapshot(packetLossIntervalRecords).repeat().takeUntil(new Predicate<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.10
            @Override // io.reactivex.functions.Predicate
            public boolean test(Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>> optional) {
                return !optional.isPresent() || optional.get().getData() == null || optional.get().getData().size() < packetLossIntervalRecords.getLimit();
            }
        }).retryWhen(new RxRetryProcess(true, 3, 10000L)).doOnNext(new Consumer<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.9
            @Override // io.reactivex.functions.Consumer
            public void accept(Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>> optional) throws Exception {
                if (optional.isPresent()) {
                    PacketLossApi.this.logger.info("getLostMsgWithMaxSnapshot({})  the resp is {}", str, optional);
                    List<DefaultSignalMessage<JsonSignalRespBody>> data = optional.get().getData();
                    if (data == null) {
                        PacketLossApi.this.logger.info("getLostMsgWithMaxSnapshot({}) : the lost msg is not all pull back ,so the local record [[{}]] is wait for resend", str, packetLossIntervalRecords);
                        return;
                    }
                    c.a().b(packetLossIntervalRecords);
                    a.b(ContextProvider.gContext, packetLossIntervalRecords.getMaxSnapshot(), MsgAscription.valueOf(packetLossIntervalRecords.getRecordAscription()));
                    PacketLossApi.this.logger.info("getLostMsgWithMaxSnapshot({}) : the lost msg has all pull back ,so remove local record [[{}]]", str, packetLossIntervalRecords);
                    PacketLossApi.this.processMessage(data, MsgAscription.valueOf(packetLossIntervalRecords.getRecordAscription()));
                }
            }
        }).subscribeOn(this.scheduler).observeOn(this.scheduler).subscribe(new Observer<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.8
            @Override // io.reactivex.Observer
            public void onComplete() {
            }

            @Override // io.reactivex.Observer
            public void onError(Throwable th) {
                PacketLossApi.this.logger.error("getLostMsgWithMaxSnapshot({})  the resp error", th.toString());
            }

            @Override // io.reactivex.Observer
            public void onNext(Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>> optional) {
                PacketLossApi.this.logger.error("getLostMsgWithMaxSnapshot on next.");
            }

            @Override // io.reactivex.Observer
            public void onSubscribe(Disposable disposable) {
                PacketLossApi.this.compositeDisposable.add(disposable);
            }
        });
    }

    private void getOfflineMsg(final MsgAscription msgAscription, final String str) {
        ClientLatestMessageRecord a = com.kedacom.uc.transmit.socket.j.a.a().a(msgAscription);
        if (a != null) {
            long convertStringToLong = LongUtil.convertStringToLong(a.getSnapshot());
            long convertStringToLong2 = LongUtil.convertStringToLong(a.a(ContextProvider.gContext, msgAscription));
            if (convertStringToLong > convertStringToLong2) {
                a.a(ContextProvider.gContext, String.valueOf(convertStringToLong), msgAscription);
                this.logger.info("getOfflineMsg({}) , the last online msg snapshot({}) is bigger than the last offline msg snapshot({}) ,so update", str, Long.valueOf(convertStringToLong), Long.valueOf(convertStringToLong2));
            }
        }
        offlinereq(msgAscription).repeat().takeUntil(new Predicate<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.15
            @Override // io.reactivex.functions.Predicate
            public boolean test(Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>> optional) {
                return (optional.isPresent() && optional.get().getHasNext()) ? false : true;
            }
        }).retryWhen(new RxRetryProcess(true, 3, 10000L)).doOnNext(new Consumer<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.14
            @Override // io.reactivex.functions.Consumer
            public void accept(Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>> optional) throws Exception {
                if (optional.isPresent()) {
                    PacketLossApi.this.logger.info("getOfflineMsg({})  resp is {}", str, optional);
                    List<DefaultSignalMessage<JsonSignalRespBody>> data = optional.get().getData();
                    if (data == null || data.size() <= 0) {
                        return;
                    }
                    PacketLossApi.this.logger.debug("getOfflineMsg({}) process data size = {}", str, Integer.valueOf(data.size()));
                    PacketLossApi.this.processMessage(data, msgAscription);
                }
            }
        }).subscribeOn(this.scheduler).observeOn(this.scheduler).subscribe(new Observer<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.13
            @Override // io.reactivex.Observer
            public void onComplete() {
            }

            @Override // io.reactivex.Observer
            public void onError(Throwable th) {
                PacketLossApi.this.logger.error("getOfflineMsg({})  error is ｛｝", str, th);
            }

            @Override // io.reactivex.Observer
            public void onNext(Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>> optional) {
                PacketLossApi.this.logger.debug("getOfflineMsg on next.");
            }

            @Override // io.reactivex.Observer
            public void onSubscribe(Disposable disposable) {
                PacketLossApi.this.compositeDisposable.add(disposable);
            }
        });
    }

    private Observable<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>> offlinereq(final MsgAscription msgAscription) {
        return Observable.fromCallable(new Callable<Optional<String>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.19
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Optional<String> call() {
                String a = a.a(ContextProvider.gContext, msgAscription);
                PacketLossApi.this.logger.info("offline req snap -> get the start snapshot = {} ,context = {},for offline req", a, ContextProvider.gContext);
                return Optional.ofNullable(a);
            }
        }).flatMap(new Function<Optional<String>, ObservableSource<TransmitHttpSnapshotResp<DefaultSignalMessage<JsonSignalRespBody>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.18
            @Override // io.reactivex.functions.Function
            public ObservableSource<TransmitHttpSnapshotResp<DefaultSignalMessage<JsonSignalRespBody>>> apply(@NonNull Optional<String> optional) {
                String str = (optional.isPresent() && StringUtil.isNotEmpty(optional.get())) ? optional.get() : "0";
                IAccount orNull = SdkImpl.getInstance().getUserSession().orNull();
                TransparentOfflineMsgReq build = TransparentOfflineMsgReq.build(msgAscription.getValue(), orNull == null ? "" : orNull.getToken(), 200, str);
                PacketLossApi.this.logger.debug("offline req snap -> start get offline msg ,param = {}", build.toString());
                return ((b) new RequestBuilder().gjson(b.class)).a(BeanFlattenUtil.deepToFlattenStringMap(build));
            }
        }).map(new TransmitSnapshotHttpHandleFuc()).doOnNext(new Consumer<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.17
            @Override // io.reactivex.functions.Consumer
            public void accept(Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>> optional) {
                if (!optional.isPresent() || !StringUtil.isNotEmpty(optional.get().getSnapshotStr())) {
                    PacketLossApi.this.logger.error("offline req snap -> save snapshot failed due to resp is null or snap is empty");
                    return;
                }
                String snapshotStr = optional.get().getSnapshotStr();
                if (a.a(ContextProvider.gContext, snapshotStr, msgAscription)) {
                    PacketLossApi.this.logger.info("offline req snap -> save snapshot = {} ,success", snapshotStr);
                } else {
                    PacketLossApi.this.logger.info("offline req snap -> save snapshot = {} ,failed", snapshotStr);
                }
            }
        }).onErrorResumeNext(new g<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.16
        });
    }

    private Observable<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>> packetLossReq(final TransparentOfflineMsgReq transparentOfflineMsgReq) {
        return Observable.just(transparentOfflineMsgReq).filter(new Predicate<TransparentOfflineMsgReq>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.21
            @Override // io.reactivex.functions.Predicate
            public boolean test(TransparentOfflineMsgReq transparentOfflineMsgReq2) throws Exception {
                return transparentOfflineMsgReq.getLimit().intValue() < 200;
            }
        }).flatMap(new Function<TransparentOfflineMsgReq, ObservableSource<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.20
            @Override // io.reactivex.functions.Function
            public ObservableSource<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>> apply(TransparentOfflineMsgReq transparentOfflineMsgReq2) throws Exception {
                return ((b) new RequestBuilder().gjson(b.class)).a(BeanFlattenUtil.deepToFlattenStringMap(transparentOfflineMsgReq)).map(new TransmitSnapshotHttpHandleFuc()).onErrorResumeNext(new g<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.20.1
                });
            }
        });
    }

    private Observable<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>> packetLossReqWithMaxSnapshot(final PacketLossIntervalRecords packetLossIntervalRecords) {
        return Observable.fromCallable(new Callable<Optional<String>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.25
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Optional<String> call() {
                return Optional.ofNullable(a.c(ContextProvider.gContext, packetLossIntervalRecords.getMaxSnapshot(), MsgAscription.valueOf(packetLossIntervalRecords.getRecordAscription())));
            }
        }).flatMap(new Function<Optional<String>, ObservableSource<TransmitHttpSnapshotResp<DefaultSignalMessage<JsonSignalRespBody>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.24
            @Override // io.reactivex.functions.Function
            public ObservableSource<TransmitHttpSnapshotResp<DefaultSignalMessage<JsonSignalRespBody>>> apply(Optional<String> optional) {
                if (!optional.isPresent()) {
                    return null;
                }
                PacketLossApi.this.logger.debug("display max snapshot : {}", optional.get());
                String str = optional.get();
                IAccount orNull = SdkImpl.getInstance().getUserSession().orNull();
                return ((b) new RequestBuilder().gjson(b.class)).a(BeanFlattenUtil.deepToFlattenStringMap(TransparentOfflineMsgReq.build(packetLossIntervalRecords.getRecordAscription(), orNull == null ? "" : orNull.getToken(), packetLossIntervalRecords.getLimit(), str, packetLossIntervalRecords.getMaxSnapshot(), packetLossIntervalRecords.getNodeName())));
            }
        }).map(new TransmitSnapshotHttpHandleFuc()).doOnNext(new Consumer<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.23
            @Override // io.reactivex.functions.Consumer
            public void accept(Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>> optional) {
                if (optional.isPresent() && StringUtil.isNotEmpty(optional.get().getSnapshotStr())) {
                    String snapshotStr = optional.get().getSnapshotStr();
                    a.a(ContextProvider.gContext, packetLossIntervalRecords.getMaxSnapshot(), snapshotStr, MsgAscription.valueOf(packetLossIntervalRecords.getRecordAscription()));
                    PacketLossApi.this.logger.info("PacketLossContext.updateStartSnapshotForMaxSnapshot ={}", snapshotStr);
                }
            }
        }).onErrorResumeNext(new g<Optional<SnapshotResult<DefaultSignalMessage<JsonSignalRespBody>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.22
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processMessage(List<DefaultSignalMessage<JsonSignalRespBody>> list, final MsgAscription msgAscription) {
        this.compositeDisposable.add(Observable.fromIterable(list).filter(new Predicate<DefaultSignalMessage<JsonSignalRespBody>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.27
            @Override // io.reactivex.functions.Predicate
            public boolean test(DefaultSignalMessage<JsonSignalRespBody> defaultSignalMessage) {
                return StringUtil.isNotEmpty(bo.a(defaultSignalMessage.getBody()));
            }
        }).map(new Function<DefaultSignalMessage<JsonSignalRespBody>, DefaultSignalMessage<JsonSignalRespBody>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.26
            @Override // io.reactivex.functions.Function
            public DefaultSignalMessage<JsonSignalRespBody> apply(DefaultSignalMessage<JsonSignalRespBody> defaultSignalMessage) {
                defaultSignalMessage.setLossPacket(true);
                return defaultSignalMessage;
            }
        }).map(new Function<DefaultSignalMessage<JsonSignalRespBody>, OfflineMessageLocalCache>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.29
            @Override // io.reactivex.functions.Function
            public OfflineMessageLocalCache apply(DefaultSignalMessage<JsonSignalRespBody> defaultSignalMessage) {
                return new OfflineMessageLocalCache(bo.a(defaultSignalMessage.getBody()), PacketLossApi.gson.toJson(defaultSignalMessage, PacketLossApi.this.type), msgAscription.getValue(), defaultSignalMessage.getHeader().getTime());
            }
        }).toList().subscribe(new Consumer<List<OfflineMessageLocalCache>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.28
            @Override // io.reactivex.functions.Consumer
            public void accept(List<OfflineMessageLocalCache> list2) {
                com.kedacom.uc.transmit.socket.j.b.a().a(list2);
                PacketLossApi.this.logger.debug("add to db is over ");
                synchronized (PacketLossApi.this.pullHolder) {
                    PacketLossApi.this.pullHolder.notify();
                }
            }
        }));
    }

    private void rePullDataFromServerByLocalPacketLossInterval(final String str) {
        Observable.fromCallable(new Callable<Optional<Void>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.7
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Optional<Void> call() throws Exception {
                for (PacketLossIntervalRecords packetLossIntervalRecords : c.a().b()) {
                    PacketLossApi.this.logger.info("{} >>rePullDataFromServerByLocalPacketLossInterval >> the record in local is {}", str, packetLossIntervalRecords);
                    PacketLossApi.this.getLostMsg(false, packetLossIntervalRecords, str);
                }
                return Optional.absent();
            }
        }).compose(ScheduleTransformer.get()).onErrorResumeNext(new g()).subscribe(RxHelper.NOTHING, new Consumer<Throwable>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.6
            @Override // io.reactivex.functions.Consumer
            public void accept(Throwable th) throws Exception {
                PacketLossApi.this.logger.debug("rePullDataFromServerByLocalPacketLossInterval finish err.");
            }
        });
    }

    @SuppressLint({"CheckResult"})
    public void clearMsgExpired() {
        Observable.just(Long.valueOf(ContextProvider.getCurrentTimeMillis() - 7776000000L)).map(new Function<Long, Optional<Void>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.30
            @Override // io.reactivex.functions.Function
            public Optional<Void> apply(Long l) throws Exception {
                if (l.longValue() > 0) {
                    com.kedacom.uc.transmit.socket.j.b.a().a(l);
                }
                return Optional.absent();
            }
        }).onErrorResumeNext(new g()).subscribe(RxHelper.NOTHING, RxHelper.DEFAULT_EXCEPTION_HANDLER);
    }

    public void clearQueryMark() {
        com.kedacom.uc.transmit.socket.j.b.a().d();
    }

    public void getLostMsg(PacketLossIntervalRecords packetLossIntervalRecords, String str) {
        getLostMsg(true, packetLossIntervalRecords, str);
    }

    public void onChannelActive() {
        String uuid = UUID.randomUUID().toString();
        this.logger.info("PacketLossApi >> onChannelActive {}", uuid);
        try {
            rePullDataFromServerByLocalPacketLossInterval(uuid);
            getOfflineMsg(MsgAscription.USER, uuid);
            getOfflineMsg(MsgAscription.DEV, uuid);
            synchronized (this.pullHolder) {
                this.pullHolder.notify();
            }
        } catch (Exception e) {
            this.logger.error("onChannelActive error {}", (Throwable) e);
        }
    }

    public void reSendLocalMsgCache() {
        this.pullDataDis = Observable.just(1).flatMap(new Function<Integer, ObservableSource<Optional<List<OfflineMessageLocalCache>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.5
            @Override // io.reactivex.functions.Function
            public ObservableSource<Optional<List<OfflineMessageLocalCache>>> apply(Integer num) throws Exception {
                PacketLossApi.this.logger.debug("print get local cache data id .");
                return com.kedacom.uc.transmit.socket.j.b.a().c();
            }
        }).flatMap(new Function<Optional<List<OfflineMessageLocalCache>>, ObservableSource<OfflineMessageLocalCache>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.4
            @Override // io.reactivex.functions.Function
            public ObservableSource<OfflineMessageLocalCache> apply(Optional<List<OfflineMessageLocalCache>> optional) throws Exception {
                if (!optional.isPresent() || optional.get().isEmpty()) {
                    PacketLossApi.this.logger.debug("print get local cache data empty.");
                    return Observable.error(new ResponseException(ResultCode.PARAM_IS_NULL));
                }
                PacketLossApi.this.logger.debug("print get local cache data size : {}", Integer.valueOf(optional.get().size()));
                optional.get().size();
                ArrayList arrayList = new ArrayList();
                Iterator<OfflineMessageLocalCache> it2 = optional.get().iterator();
                while (it2.hasNext()) {
                    arrayList.add(it2.next().getSnapshot());
                }
                com.kedacom.uc.transmit.socket.j.b.a().b(arrayList);
                return Observable.fromIterable(optional.get());
            }
        }).map(new Function<OfflineMessageLocalCache, Optional<Void>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.3
            @Override // io.reactivex.functions.Function
            public Optional<Void> apply(OfflineMessageLocalCache offlineMessageLocalCache) throws Exception {
                DefaultSignalMessage defaultSignalMessage = (DefaultSignalMessage) PacketLossApi.gson.fromJson(offlineMessageLocalCache.getMsgJson(), PacketLossApi.this.type);
                PacketLossApi.this.logger.info(">> reSendLocalMsgCache : {}", defaultSignalMessage);
                SignalSocketReq.getInstance().publishSignalMessageSub(Optional.ofNullable(p.a().a(defaultSignalMessage)));
                return Optional.absent();
            }
        }).buffer(Integer.MAX_VALUE).retryWhen(new Function<Observable<Throwable>, ObservableSource<List<Optional<Void>>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.2
            @Override // io.reactivex.functions.Function
            public ObservableSource<List<Optional<Void>>> apply(Observable<Throwable> observable) throws Exception {
                return observable.map(new Function<Throwable, List<Optional<Void>>>() { // from class: com.kedacom.uc.transmit.socket.PacketLossApi.2.1
                    @Override // io.reactivex.functions.Function
                    public List<Optional<Void>> apply(Throwable th) throws Exception {
                        if ((th instanceof ResponseException) && ((ResponseException) th).getCode() == ResultCode.PARAM_IS_NULL) {
                            synchronized (PacketLossApi.this.pullHolder) {
                                PacketLossApi.this.pullHolder.wait(w0.B);
                            }
                        }
                        PacketLossApi.this.logger.error("print get local cache data unknow err :", th);
                        return new ArrayList();
                    }
                });
            }
        }).repeat().subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe(RxHelper.NOTHING, RxHelper.DEFAULT_EXCEPTION_HANDLER);
    }

    public void release() {
        CompositeDisposable compositeDisposable = this.compositeDisposable;
        if (compositeDisposable != null) {
            compositeDisposable.dispose();
        }
        Disposable disposable = this.pullDataDis;
        if (disposable != null) {
            disposable.dispose();
        }
        com.kedacom.uc.transmit.socket.j.a.a().c();
        com.kedacom.uc.transmit.socket.j.b.a().e();
        c.a().c();
        ExecutorService executorService = this.threadPoolExecutor;
        if (executorService != null) {
            executorService.shutdownNow();
            this.threadPoolExecutor = null;
        }
        Scheduler scheduler = this.scheduler;
        if (scheduler != null) {
            scheduler.shutdown();
            this.scheduler = null;
            this.threadPoolExecutor = null;
        }
        instance = null;
    }
}
