package com.taobao.message.eventengine.core;

import androidx.annotation.NonNull;
import com.alibaba.fastjson.JSON;
import com.android.alibaba.ip.runtime.IpChange;
import com.taobao.android.virtual_thread.face.ThreadNameFactory;
import com.taobao.android.virtual_thread.face.VExecutors;
import com.taobao.message.eventengine.event.EventRepository;
import com.taobao.message.eventengine.event.EventRepositoryImpl;
import com.taobao.message.eventengine.report.UploadManager;
import com.taobao.message.kit.apmmonitor.toolbox.RandomUtil;
import com.taobao.message.kit.config.ConfigCenterManager;
import com.taobao.message.kit.util.Env;
import com.taobao.message.kit.util.MessageLog;
import com.taobao.message.kit.util.TextUtils;
import com.taobao.message.kit.util.TimeStamp;
import io.reactivex.disposables.a;
import io.reactivex.e;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.c;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: classes3.dex */
public class EventDispatcher {
    public static volatile transient /* synthetic */ IpChange $ipChange = null;
    private static final int QUEUE_CAPACITY = 4096;
    private static final String TAG = "EventDispatcher";
    private static ExecutorService mDispatchExecutor = VExecutors.getThreadPoolFactory().createExecutorService(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(4096), new ThreadNameFactory() { // from class: com.taobao.message.eventengine.core.EventDispatcher.1
        public static volatile transient /* synthetic */ IpChange $ipChange;
        private final AtomicInteger mCount = new AtomicInteger(1);

        @Override // com.taobao.android.virtual_thread.face.ThreadNameFactory
        @NonNull
        public String newThreadName() {
            IpChange ipChange = $ipChange;
            if (ipChange instanceof IpChange) {
                return (String) ipChange.ipc$dispatch("57dab4a4", new Object[]{this});
            }
            String str = "MessageEvent-" + this.mCount.getAndIncrement();
            MessageLog.d(EventDispatcher.TAG, "Spawning ", str);
            return str;
        }
    }, new ThreadPoolExecutor.DiscardOldestPolicy());
    private EventRepository mEventRepository;
    private String mIdentifier;
    private UploadManager mUploadManager;
    private c<Object> mPublish = PublishSubject.a().a();
    private a mCompositeDisposable = new a();
    private Map<String, List<ClientEvent>> mBuffer = new HashMap();
    private DispatcherHistory mDispatcherHistory = new DispatcherHistory();

    /* loaded from: classes3.dex */
    public static final class DirectCommitStrategy extends Strategy {
    }

    /* loaded from: classes3.dex */
    public static class Strategy {
        public long bufferTime = 0;
        public long bufferCount = 100;
        public boolean stable = false;
        public String mergeTopicKey = null;
    }

    static {
        ExecutorService executorService = mDispatchExecutor;
        if (executorService instanceof ThreadPoolExecutor) {
            ((ThreadPoolExecutor) executorService).allowCoreThreadTimeOut(true);
        }
    }

    public EventDispatcher(String str) {
        this.mIdentifier = str;
        this.mEventRepository = new EventRepositoryImpl(this.mIdentifier);
        this.mUploadManager = new UploadManager(this.mIdentifier);
    }

    public static /* synthetic */ void access$000(EventDispatcher eventDispatcher, String str, Strategy strategy, List list) {
        IpChange ipChange = $ipChange;
        if (ipChange instanceof IpChange) {
            ipChange.ipc$dispatch("ce54c3dd", new Object[]{eventDispatcher, str, strategy, list});
        } else {
            eventDispatcher.action(str, strategy, list);
        }
    }

    public static /* synthetic */ Map access$100(EventDispatcher eventDispatcher) {
        IpChange ipChange = $ipChange;
        return ipChange instanceof IpChange ? (Map) ipChange.ipc$dispatch("d72cc7f", new Object[]{eventDispatcher}) : eventDispatcher.mBuffer;
    }

    public static /* synthetic */ EventRepository access$200(EventDispatcher eventDispatcher) {
        IpChange ipChange = $ipChange;
        return ipChange instanceof IpChange ? (EventRepository) ipChange.ipc$dispatch("b4868bfb", new Object[]{eventDispatcher}) : eventDispatcher.mEventRepository;
    }

    private void action(final String str, final Strategy strategy, List<ClientEvent> list) {
        final List<ClientEvent> list2;
        EventRepository eventRepository;
        IpChange ipChange = $ipChange;
        if (ipChange instanceof IpChange) {
            ipChange.ipc$dispatch("218a0b2f", new Object[]{this, str, strategy, list});
            return;
        }
        if (Env.isDebug()) {
            MessageLog.d(TAG, "AcionEvent|", str, "|", Integer.valueOf(list.size()), "|", JSON.toJSONString(list), "|", JSON.toJSONString(strategy));
        } else {
            MessageLog.e(TAG, "AcionEvent|", str, "|", Integer.valueOf(list.size()), "|", JSON.toJSONString(strategy));
        }
        if (strategy instanceof DirectCommitStrategy) {
            this.mCompositeDisposable.add(this.mUploadManager.upload(list).subscribe(new Consumer<List<ClientEvent>>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.9
                public static volatile transient /* synthetic */ IpChange $ipChange;

                @Override // io.reactivex.functions.Consumer
                public void accept(List<ClientEvent> list3) {
                    IpChange ipChange2 = $ipChange;
                    if (ipChange2 instanceof IpChange) {
                        ipChange2.ipc$dispatch("1d130480", new Object[]{this, list3});
                    } else if (Env.isDebug()) {
                        MessageLog.d(EventDispatcher.TAG, "UploadSuccess|", str, "|", Integer.valueOf(list3.size()), "|", JSON.toJSONString(list3));
                    } else {
                        MessageLog.e(EventDispatcher.TAG, "UploadSuccess|", str, "|", Integer.valueOf(list3.size()));
                    }
                }
            }, new Consumer<Throwable>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.10
                public static volatile transient /* synthetic */ IpChange $ipChange;

                @Override // io.reactivex.functions.Consumer
                public void accept(Throwable th) throws Exception {
                    IpChange ipChange2 = $ipChange;
                    if (ipChange2 instanceof IpChange) {
                        ipChange2.ipc$dispatch("5d8addc6", new Object[]{this, th});
                        return;
                    }
                    MessageLog.e(EventDispatcher.TAG, "UploadError|" + th.toString());
                }
            }));
            return;
        }
        if (strategy.stable && (eventRepository = this.mEventRepository) != null) {
            eventRepository.save(list);
        }
        if (this.mBuffer.containsKey(str)) {
            list2 = this.mBuffer.get(str);
            list2.addAll(list);
        } else {
            list2 = this.mEventRepository.getSortedEvent();
            if (list2 == null) {
                list2 = new ArrayList<>();
            }
            this.mBuffer.put(str, list2);
        }
        long dispatchTime = this.mDispatcherHistory.getDispatchTime(str);
        long currentTimeStamp = TimeStamp.getCurrentTimeStamp();
        if (dispatchTime == 0) {
            this.mDispatcherHistory.saveDispatchTime(str, currentTimeStamp);
            return;
        }
        if (currentTimeStamp - dispatchTime >= strategy.bufferTime || list2.size() >= strategy.bufferCount) {
            if (Env.isDebug()) {
                MessageLog.d(TAG, "Upload|", str, "|", Integer.valueOf(list2.size()), "|", JSON.toJSONString(list2));
            } else {
                MessageLog.e(TAG, "Upload|", str, "|", Integer.valueOf(list.size()));
            }
            this.mDispatcherHistory.saveDispatchTime(str, currentTimeStamp);
            ArrayList arrayList = new ArrayList();
            if (strategy.mergeTopicKey != null) {
                HashMap hashMap = new HashMap();
                for (ClientEvent clientEvent : list2) {
                    if (clientEvent.getExt() == null || !clientEvent.getExt().containsKey(strategy.mergeTopicKey)) {
                        arrayList.add(clientEvent);
                    } else {
                        Object obj = clientEvent.getExt().get(strategy.mergeTopicKey);
                        ClientEvent clientEvent2 = (ClientEvent) hashMap.get(obj);
                        if (clientEvent2 == null || clientEvent.getEventTime() >= clientEvent2.getEventTime()) {
                            hashMap.put(obj, clientEvent);
                        }
                    }
                }
                arrayList.addAll(hashMap.values());
            } else {
                arrayList.addAll(list2);
            }
            if (Env.isDebug()) {
                MessageLog.d(TAG, "Upload|", str, "|", Integer.valueOf(arrayList.size()), "|", JSON.toJSONString(arrayList));
            } else {
                MessageLog.e(TAG, "Upload|", str, "|", Integer.valueOf(arrayList.size()));
            }
            if (arrayList.isEmpty()) {
                return;
            }
            this.mCompositeDisposable.add(this.mUploadManager.upload(arrayList).subscribe(new Consumer<List<ClientEvent>>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.11
                public static volatile transient /* synthetic */ IpChange $ipChange;

                @Override // io.reactivex.functions.Consumer
                public void accept(List<ClientEvent> list3) {
                    IpChange ipChange2 = $ipChange;
                    if (ipChange2 instanceof IpChange) {
                        ipChange2.ipc$dispatch("1d130480", new Object[]{this, list3});
                        return;
                    }
                    if (Env.isDebug()) {
                        MessageLog.d(EventDispatcher.TAG, "UploadSuccess|", str, "|", Integer.valueOf(list3.size()), "|", JSON.toJSONString(list3));
                    } else {
                        MessageLog.e(EventDispatcher.TAG, "UploadSuccess|", str, "|", Integer.valueOf(list3.size()));
                    }
                    if (list3.isEmpty()) {
                        return;
                    }
                    List list4 = (List) EventDispatcher.access$100(EventDispatcher.this).get(str);
                    if (list4 != null) {
                        list4.removeAll(list2);
                    }
                    if (strategy.stable) {
                        EventDispatcher.access$200(EventDispatcher.this).remove(list2);
                    }
                }
            }, new Consumer<Throwable>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.12
                public static volatile transient /* synthetic */ IpChange $ipChange;

                @Override // io.reactivex.functions.Consumer
                public void accept(Throwable th) throws Exception {
                    IpChange ipChange2 = $ipChange;
                    if (ipChange2 instanceof IpChange) {
                        ipChange2.ipc$dispatch("5d8addc6", new Object[]{this, th});
                        return;
                    }
                    MessageLog.e(EventDispatcher.TAG, "UploadError|" + th.toString());
                }
            }));
        }
    }

    public void dispatchNow() {
        IpChange ipChange = $ipChange;
        if (ipChange instanceof IpChange) {
            ipChange.ipc$dispatch("c8540a83", new Object[]{this});
        }
    }

    public void publish(ClientEvent clientEvent) {
        IpChange ipChange = $ipChange;
        if (ipChange instanceof IpChange) {
            ipChange.ipc$dispatch("b015523e", new Object[]{this, clientEvent});
        } else {
            this.mPublish.onNext(clientEvent);
        }
    }

    public void shotdown() {
        IpChange ipChange = $ipChange;
        if (ipChange instanceof IpChange) {
            ipChange.ipc$dispatch("618dc923", new Object[]{this});
        } else {
            this.mCompositeDisposable.dispose();
        }
    }

    public void subscribe(final String str, final Strategy strategy) {
        IpChange ipChange = $ipChange;
        if (ipChange instanceof IpChange) {
            ipChange.ipc$dispatch("6c72752", new Object[]{this, str, strategy});
            return;
        }
        this.mCompositeDisposable.add(this.mPublish.observeOn(io.reactivex.schedulers.a.a(mDispatchExecutor)).ofType(ClientEvent.class).filter(new Predicate<ClientEvent>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.6
            public static volatile transient /* synthetic */ IpChange $ipChange;

            @Override // io.reactivex.functions.Predicate
            public boolean test(ClientEvent clientEvent) throws Exception {
                IpChange ipChange2 = $ipChange;
                return ipChange2 instanceof IpChange ? ((Boolean) ipChange2.ipc$dispatch("d9a3d69f", new Object[]{this, clientEvent})).booleanValue() : TextUtils.equals(str, clientEvent.getEventName());
            }
        }).map(new Function<ClientEvent, List<ClientEvent>>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.5
            public static volatile transient /* synthetic */ IpChange $ipChange;

            @Override // io.reactivex.functions.Function
            public List<ClientEvent> apply(ClientEvent clientEvent) throws Exception {
                IpChange ipChange2 = $ipChange;
                return ipChange2 instanceof IpChange ? (List) ipChange2.ipc$dispatch("5cfa2328", new Object[]{this, clientEvent}) : Collections.singletonList(clientEvent);
            }
        }).subscribe(new Consumer<List<ClientEvent>>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.2
            public static volatile transient /* synthetic */ IpChange $ipChange;

            @Override // io.reactivex.functions.Consumer
            public void accept(List<ClientEvent> list) throws Exception {
                IpChange ipChange2 = $ipChange;
                if (ipChange2 instanceof IpChange) {
                    ipChange2.ipc$dispatch("1d130480", new Object[]{this, list});
                } else {
                    EventDispatcher.access$000(EventDispatcher.this, str, strategy, list);
                }
            }
        }, new Consumer<Throwable>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.3
            public static volatile transient /* synthetic */ IpChange $ipChange;

            @Override // io.reactivex.functions.Consumer
            public void accept(Throwable th) throws Exception {
                IpChange ipChange2 = $ipChange;
                if (ipChange2 instanceof IpChange) {
                    ipChange2.ipc$dispatch("5d8addc6", new Object[]{this, th});
                } else {
                    MessageLog.e(EventDispatcher.TAG, th.toString());
                }
            }
        }, new Action() { // from class: com.taobao.message.eventengine.core.EventDispatcher.4
            public static volatile transient /* synthetic */ IpChange $ipChange;

            @Override // io.reactivex.functions.Action
            public void run() throws Exception {
                IpChange ipChange2 = $ipChange;
                if (ipChange2 instanceof IpChange) {
                    ipChange2.ipc$dispatch("5c510192", new Object[]{this});
                } else {
                    MessageLog.e(EventDispatcher.TAG, "completed");
                }
            }
        }));
        if ("1".equals(ConfigCenterManager.getContainerConfig("engineUploadCold", "1")) && strategy.stable) {
            this.mCompositeDisposable.add(e.timer(RandomUtil.getRandomNumberInRange(6000) + 6000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.7
                public static volatile transient /* synthetic */ IpChange $ipChange;

                @Override // io.reactivex.functions.Consumer
                public void accept(Long l) throws Exception {
                    IpChange ipChange2 = $ipChange;
                    if (ipChange2 instanceof IpChange) {
                        ipChange2.ipc$dispatch("b4e4afce", new Object[]{this, l});
                    } else {
                        EventDispatcher.access$000(EventDispatcher.this, str, strategy, new ArrayList());
                    }
                }
            }, new Consumer<Throwable>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.8
                public static volatile transient /* synthetic */ IpChange $ipChange;

                @Override // io.reactivex.functions.Consumer
                public void accept(Throwable th) throws Exception {
                    IpChange ipChange2 = $ipChange;
                    if (ipChange2 instanceof IpChange) {
                        ipChange2.ipc$dispatch("5d8addc6", new Object[]{this, th});
                        return;
                    }
                    MessageLog.e(EventDispatcher.TAG, "error|" + str + "|" + th.toString());
                }
            }));
        }
    }
}
