package risesoft.data.transfer.core.statistics;

import java.text.DecimalFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.time.DateFormatUtils;
import risesoft.data.transfer.core.context.JobContext;
import risesoft.data.transfer.core.record.Record;
import risesoft.data.transfer.core.util.StrUtil;

/* loaded from: input_file:risesoft/data/transfer/core/statistics/CommunicationTool.class */
public final class CommunicationTool {
    public static final String STAGE = "stage";
    public static final String BYTE_SPEED = "byteSpeed";
    public static final String RECORD_SPEED = "recordSpeed";
    public static final String PERCENTAGE = "percentage";
    public static final String READ_SUCCEED_RECORDS = "readSucceedRecords";
    public static final String READ_SUCCEED_BYTES = "readSucceedBytes";
    public static final String READ_FAILED_RECORDS = "readFailedRecords";
    public static final String READ_FAILED_BYTES = "readFailedBytes";
    public static final String WRITE_RECEIVED_RECORDS = "writeReceivedRecords";
    public static final String WRITE_RECEIVED_BYTES = "writeReceivedBytes";
    public static final String WRITE_FAILED_RECORDS = "writeFailedRecords";
    public static final String WRITE_FAILED_BYTES = "writeFailedBytes";
    private static final String WRITE_SUCCEED_RECORDS = "writeSucceedRecords";
    private static final String WRITE_SUCCEED_BYTES = "writeSucceedBytes";
    public static final String READ_JOB_END = "readJobEnd";
    public static final String READ_JOB_START = "readJobStart";
    public static final String WRITER_JOB_END = "writerJobEnd";
    public static final String WRITER_JOB_START = "writerJobStart";

    /* loaded from: input_file:risesoft/data/transfer/core/statistics/CommunicationTool$Stringify.class */
    public static class Stringify {
        private static final DecimalFormat df = new DecimalFormat("0.00");

        private static String getSpeed(Communication communication) {
            return String.format("%s/s, %d records/s", StrUtil.stringify(communication.getLongCounter(CommunicationTool.BYTE_SPEED).longValue()), communication.getLongCounter(CommunicationTool.RECORD_SPEED));
        }

        public static String unitTime(long j) {
            return unitTime(j, TimeUnit.NANOSECONDS);
        }

        public static String unitTime(long j, TimeUnit timeUnit) {
            return String.format("%,.3fs", Float.valueOf(((float) timeUnit.toNanos(j)) / 1.0E9f));
        }

        public static String unitSize(long j) {
            return j > 1000000000 ? String.format("%,.2fG", Float.valueOf(((float) j) / 1.0E9f)) : j > 1000000 ? String.format("%,.2fM", Float.valueOf(((float) j) / 1000000.0f)) : j > 1000 ? String.format("%,.2fK", Float.valueOf(((float) j) / 1000.0f)) : j + "B";
        }

        private static String getPercentage(Communication communication) {
            return df.format(communication.getDoubleCounter(CommunicationTool.PERCENTAGE).doubleValue() * 100.0d) + "%";
        }
    }

    public static Communication getReportCommunication(Communication communication, Communication communication2, int i) {
        Validate.isTrue((communication == null || communication2 == null) ? false : true, "为汇报准备的新旧metric不能为null", new Object[0]);
        long totalReadRecords = getTotalReadRecords(communication);
        long totalReadBytes = getTotalReadBytes(communication);
        communication.setLongCounter(WRITE_SUCCEED_RECORDS, getWriteSucceedRecords(communication));
        communication.setLongCounter(WRITE_SUCCEED_BYTES, getWriteSucceedBytes(communication));
        long timestamp = communication.getTimestamp() - communication2.getTimestamp();
        long j = timestamp <= 1000 ? 1L : timestamp / 1000;
        long totalReadBytes2 = (totalReadBytes - getTotalReadBytes(communication2)) / j;
        long totalReadRecords2 = (totalReadRecords - getTotalReadRecords(communication2)) / j;
        communication.setLongCounter(BYTE_SPEED, totalReadBytes2 < 0 ? 0L : totalReadBytes2);
        communication.setLongCounter(RECORD_SPEED, totalReadRecords2 < 0 ? 0L : totalReadRecords2);
        communication.setDoubleCounter(PERCENTAGE, communication.getLongCounter(STAGE).longValue() / i);
        if (communication2.getThrowable() != null) {
            communication.setThrowable(communication2.getThrowable());
        }
        return communication;
    }

    public static long getTotalReadRecords(Communication communication) {
        return communication.getLongCounter(READ_SUCCEED_RECORDS).longValue() + communication.getLongCounter(READ_FAILED_RECORDS).longValue();
    }

    public static long getTotalReadBytes(Communication communication) {
        return communication.getLongCounter(READ_SUCCEED_BYTES).longValue() + communication.getLongCounter(READ_FAILED_BYTES).longValue();
    }

    public static String getStatistics(Communication communication) {
        long endTime = (communication.getEndTime() - communication.getStartTime()) / 1000;
        if (0 == endTime) {
            endTime = 1;
        }
        return String.format("\n%-26s: %19s\n%-26s: %-18s\n%-26s: %-18s\n%-26s: %19s\n%-26s: %19s\n%-26s: %19s\n%-26s: %19s\n%-26s: %19s\n%-26s: %19s\n%-26s: %19s\n", "任务总流量", StrUtil.stringify(communication.getLongCounter(READ_SUCCEED_BYTES).longValue()), "任务启动时刻", DateFormatUtils.format(new Date(communication.getStartTime()), "yyyy-MM-dd HH:mm:ss"), "任务结束时刻", DateFormatUtils.format(new Date(communication.getEndTime()), "yyyy-MM-dd HH:mm:ss"), "任务输入情况", String.format("%s-%s %s/s", DateFormatUtils.format(new Date(communication.getLongCounter(READ_JOB_START).longValue()), "HH:mm:ss"), DateFormatUtils.format(new Date(communication.getLongCounter(READ_JOB_END).longValue()), "HH:mm:ss"), Long.valueOf((communication.getLongCounter(READ_JOB_END).longValue() - communication.getLongCounter(READ_JOB_START).longValue()) / 1000)), "任务输出情况", String.format("%s-%s %d/s", DateFormatUtils.format(new Date(communication.getLongCounter(WRITER_JOB_START).longValue()), "HH:mm:ss"), DateFormatUtils.format(new Date(communication.getLongCounter(WRITER_JOB_END).longValue()), "HH:mm:ss"), Long.valueOf((communication.getLongCounter(WRITER_JOB_END).longValue() - communication.getLongCounter(WRITER_JOB_START).longValue()) / 1000)), "任务总计耗时", endTime + "s", "任务平均流量", StrUtil.stringify(communication.getLongCounter(READ_SUCCEED_BYTES).longValue() / endTime) + "/s", "记录写入速度", String.valueOf(communication.getLongCounter(READ_SUCCEED_RECORDS).longValue() / endTime) + "rec/s", "读出记录总数", String.valueOf(getTotalReadRecords(communication)), "读写失败总数", String.valueOf(getTotalErrorRecords(communication)));
    }

    public static long getTotalErrorRecords(Communication communication) {
        return communication.getLongCounter(READ_FAILED_RECORDS).longValue() + communication.getLongCounter(WRITE_FAILED_RECORDS).longValue();
    }

    public static long getRecordSize(List<Record> list, int i, int i2) {
        long j = 0;
        for (int i3 = i; i3 < i2; i3++) {
            j += list.get(i3).getByteSize();
        }
        return j;
    }

    public static long getRecordSize(List<Record> list) {
        return getRecordSize(list, 0, list.size());
    }

    public static long getTotalErrorBytes(Communication communication) {
        return communication.getLongCounter(READ_FAILED_BYTES).longValue() + communication.getLongCounter(WRITE_FAILED_BYTES).longValue();
    }

    public static long getWriteSucceedRecords(Communication communication) {
        return communication.getLongCounter(WRITE_RECEIVED_RECORDS).longValue() - communication.getLongCounter(WRITE_FAILED_RECORDS).longValue();
    }

    public static long getWriteSucceedBytes(Communication communication) {
        return communication.getLongCounter(WRITE_RECEIVED_BYTES).longValue() - communication.getLongCounter(WRITE_FAILED_BYTES).longValue();
    }

    public static String getStatus(State state) {
        switch (state) {
            case RUNNING:
                return "运行中";
            case WAITING:
                return "休眠中";
            case FAILED:
                return "执行失败";
            case SUCCEEDED:
                return "成功了";
            default:
                return "未知";
        }
    }

    public static String getStateInfo(Communication communication, JobContext jobContext) {
        return String.format("任务状态:%s       | 输入数据总量:%d/record %d/byte     | 输出数据总量:%d/record %d/byte     | 剩余输入任务:  %d       |剩余输出任务: %d       | 耗时: %d/s ", getStatus(communication.getState()), communication.getLongCounter(READ_SUCCEED_RECORDS), communication.getLongCounter(READ_SUCCEED_BYTES), communication.getLongCounter(WRITE_SUCCEED_RECORDS), communication.getLongCounter(WRITE_SUCCEED_BYTES), Integer.valueOf(jobContext.getInExecutorTaskQueue().getResidueSize()), Integer.valueOf(jobContext.getOutExecutorTaskQueue().getResidueSize()), Long.valueOf((System.currentTimeMillis() - communication.getStartTime()) / 1000));
    }
}
