Channel 原理
Channel是Reader和Writer的通信组件。Reader向channle写入数据,Writer从channel读取数据。channel还提供了限速的功能,支持数据大小(字节数), 数据条数。
写入数据
Channel提供push方法,给Reader调用,写入数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public void push(final Record r) {
Validate.notNull(r, "record不能为空.");
// 子类实现doPush方法
this.doPush(r);
// statPush会进行限速
this.statPush(1L, r.getByteSize());
}
public void pushAll(final Collection<Record> rs) {
Validate.notNull(rs);
Validate.noNullElements(rs);
// 子类实现doPush方法
this.doPushAll(rs);
// statPush会进行限速
this.statPush(rs.size(), this.getByteSize(rs));
}
|
statPush里面会对速度进行控制。它通过Communication记录总的写入数据大小和数据条数。然后每隔一段时间,检查速度。如果速度过快,就会sleep一段时间,来把速度降下来。
CommunicationTool 提供方法,从Communication计算读取的字节数,条数
1
2
3
4
5
6
7
8
9
10
11
|
public final class CommunicationTool {
public static long getTotalReadRecords(final Communication communication) {
return communication.getLongCounter(READ_SUCCEED_RECORDS) +
communication.getLongCounter(READ_FAILED_RECORDS);
}
public static long getTotalReadBytes(final Communication communication) {
return communication.getLongCounter(READ_SUCCEED_BYTES) +
communication.getLongCounter(READ_FAILED_BYTES);
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
private void statPush(long recordSize, long byteSize) {
// currentCommunication实时记录了Reader读取的总数据字节数和条数
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
recordSize);
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
byteSize);
......
// 判断是否会限速
boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
return;
}
// lastCommunication记录最后一次的时间
long lastTimestamp = lastCommunication.getTimestamp();
long nowTimestamp = System.currentTimeMillis();
long interval = nowTimestamp - lastTimestamp;
// 每隔flowControlInterval一段时间,就会检查是否超速
if (interval - this.flowControlInterval >= 0) {
long byteLimitSleepTime = 0;
long recordLimitSleepTime = 0;
if (isChannelByteSpeedLimit) {
// 计算速度,(现在的字节数 - 上一次的字节数) / 过去的时间
long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
if (currentByteSpeed > this.byteSpeed) {
// 计算根据byteLimit得到的休眠时间,
// 这段时间传输的字节数 / 期望的限定速度 - 这段时间
byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
- interval;
}
}
if (isChannelRecordSpeedLimit) {
long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
if (currentRecordSpeed > this.recordSpeed) {
// 计算根据recordLimit得到的休眠时间
recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
- interval;
}
}
// 休眠时间取较大值
long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
recordLimitSleepTime : byteLimitSleepTime;
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 保存读取字节数
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
// 保存读取失败的字节数
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
// 保存读取条数
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
// 保存读取失败的条数
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
// 记录保存的时间点
lastCommunication.setTimestamp(nowTimestamp);
}
}
|
读取数据
Channel提供pull方法,给Writer调用,读取数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public Record pull() {
// 子类实现doPull方法,返回数据
Record record = this.doPull();
// 调用statPull方法,更新统计数据
this.statPull(1L, record.getByteSize());
return record;
}
public void pullAll(final Collection<Record> rs) {
Validate.notNull(rs);
// 子类实现doPullAll方法,返回数据
this.doPullAll(rs);
// 调用statPull方法,更新统计数据
this.statPull(rs.size(), this.getByteSize(rs));
}
|
statPull方法,并没有限速。因为数据的整个流程是Reader -》 Channle -》 Writer, Reader的push速度限制了,Writer的pull速度也就没必要限速
1
2
3
4
5
6
|
private void statPull(long recordSize, long byteSize) {
currentCommunication.increaseCounter(
CommunicationTool.WRITE_RECEIVED_RECORDS, recordSize);
currentCommunication.increaseCounter(
CommunicationTool.WRITE_RECEIVED_BYTES, byteSize);
}
|
MemoryChannel 原理
目前Channel的子类只有MemoryChannel。MemoryChannel实现了doPush和doPull方法。它本质是将数据放进ArrayBlockingQueue。
先看看MemoryChannel的一些属性
1
2
3
4
5
6
7
8
9
10
11
12
|
public class MemoryChannel extends Channel {
// 等待Reader处理完的时间,也就是pull的时间,继承自Channel
protected volatile long waitReaderTime = 0;
// 等待Writer处理完的时间,也就是push的时间,继承自Channel
protected volatile long waitWriterTime = 0;
// Channel里面保存的数据大小
private AtomicInteger memoryBytes = new AtomicInteger(0);
// 存放记录的queue
private ArrayBlockingQueue<Record> queue = null;
}
|
读写单条数据
首先看push和pull单条的情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
protected void doPush(Record r) {
try {
long startTime = System.nanoTime();
// ArrayBlockingQueue提供了阻塞的put方法,写入数据
this.queue.put(r);
// 记录写入push花费的时间
waitWriterTime += System.nanoTime() - startTime;
// 更新Channle里数据的字节数
memoryBytes.addAndGet(r.getMemorySize());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
protected Record doPull() {
try {
long startTime = System.nanoTime();
// ArrayBlockingQueue提供了阻塞的take方法,读取入数据
Record r = this.queue.take();
// 记录写入pull花费的时间
waitReaderTime += System.nanoTime() - startTime;
// 更新Channle里数据的字节数
memoryBytes.addAndGet(-r.getMemorySize());
return r;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
|
读写多条数据
再看看push和pull多条的情况,这种方法比起单条的效率更高。 因为ArrayBlockingQueue没有提供批量操作的阻塞方法,所以需要自己使用条件锁。
先看看相关的一些属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public class MemoryChannel extends Channel {
// 递归锁
private ReentrantLock lock;
// 条件信号
private Condition notInsufficient, notEmpty;
// 一次从Channel的pull的数据条数
private int bufferSize = 0;
// 数据的字节数容量
protected int byteCapacity;
public MemoryChannel(final Configuration configuration) {
.......
this.bufferSize = configuration.getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE);
// 初始化锁
lock = new ReentrantLock();
notInsufficient = lock.newCondition();
notEmpty = lock.newCondition();
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
protected void doPullAll(Collection<Record> rs) {
assert rs != null;
rs.clear();
try {
long startTime = System.nanoTime();
// 获取锁
lock.lockInterruptibly();
// 从queue里面取出数据,最多bufferSize条
while (this.queue.drainTo(rs, bufferSize) <= 0) {
// 如果queue里面没有数据,就等待notEmpty信号
notEmpty.await(200L, TimeUnit.MILLISECONDS);
}
// 更新pull的时间
waitReaderTime += System.nanoTime() - startTime;
int bytes = getRecordBytes(rs);
// 更新数据的字节数
memoryBytes.addAndGet(-bytes);
// 通知可以push数据的信号
notInsufficient.signalAll();
} catch (InterruptedException e) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
} finally {
lock.unlock();
}
}
protected void doPushAll(Collection<Record> rs) {
try {
long startTime = System.nanoTime();
// 获取锁
lock.lockInterruptibly();
int bytes = getRecordBytes(rs);
while (memoryBytes.get() + bytes > this.byteCapacity || rs.size() > this.queue.remainingCapacity()) {
// 如果新增数据,会造成数据字节数超过指定容量, 或者超过了queue的容量,就会一直等待notInsufficient信号
notInsufficient.await(200L, TimeUnit.MILLISECONDS);
}
// 向queue里添加数据
this.queue.addAll(rs);
// 更新push的时间
waitWriterTime += System.nanoTime() - startTime;
// 更新数据的字节数
memoryBytes.addAndGet(bytes);
// 通知可以pull数据的信号
notEmpty.signalAll();
} catch (InterruptedException e) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
} finally {
lock.unlock();
}
}
|