博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的AbstractTtlState
阅读量:6320 次
发布时间:2019-06-22

本文共 9929 字,大约阅读时间需要 33 分钟。

本文主要研究一下flink的AbstractTtlState

InternalKvState

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/internal/InternalKvState.java

/** * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the * {@link State} being the root of the public API state hierarchy. *  * 

The internal state classes give access to the namespace getters and setters and access to * additional functionality, like raw value access or state merging. * *

The public API state hierarchy is intended to be programmed against by Flink applications. * The internal state hierarchy holds all the auxiliary methods that are used by the runtime and not * intended to be used by user applications. These internal methods are considered of limited use to users and * only confusing, and are usually not regarded as stable across releases. * *

Each specific type in the internal state hierarchy extends the type from the public * state hierarchy: * *

 *             State *               | *               +-------------------InternalKvState *               |                         | *          MergingState                   | *               |                         | *               +-----------------InternalMergingState *               |                         | *      +--------+------+                  | *      |               |                  | * ReducingState    ListState        +-----+-----------------+ *      |               |            |                       | *      +-----------+   +-----------   -----------------InternalListState *                  |                | *                  +---------InternalReducingState * 
* * @param
The type of key the state is associated to * @param
The type of the namespace * @param
The type of values kept internally in state */public interface InternalKvState
extends State { TypeSerializer
getKeySerializer(); TypeSerializer
getNamespaceSerializer(); TypeSerializer
getValueSerializer(); void setCurrentNamespace(N namespace); byte[] getSerializedValue( final byte[] serializedKeyAndNamespace, final TypeSerializer
safeKeySerializer, final TypeSerializer
safeNamespaceSerializer, final TypeSerializer
safeValueSerializer) throws Exception;}复制代码
  • InternalKvState接口定义内部的kvState要实现的方法,这里主要是getKeySerializer、getNamespaceSerializer、getValueSerializer、setCurrentNamespace、getSerializedValue

AbstractTtlState

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/AbstractTtlState.java

/** * Base class for TTL logic wrappers of state objects. * * @param 
The type of key the state is associated to * @param
The type of the namespace * @param
The type of values kept internally in state without TTL * @param
The type of values kept internally in state with TTL * @param
Type of originally wrapped state object */abstract class AbstractTtlState
> extends AbstractTtlDecorator
implements InternalKvState
{ private final TypeSerializer
valueSerializer; AbstractTtlState(S original, StateTtlConfig config, TtlTimeProvider timeProvider, TypeSerializer
valueSerializer) { super(original, config, timeProvider); this.valueSerializer = valueSerializer; }
T getWithTtlCheckAndUpdate( SupplierWithException
, SE> getter, ThrowingConsumer
, CE> updater) throws SE, CE { return getWithTtlCheckAndUpdate(getter, updater, original::clear); } @Override public TypeSerializer
getKeySerializer() { return original.getKeySerializer(); } @Override public TypeSerializer
getNamespaceSerializer() { return original.getNamespaceSerializer(); } @Override public TypeSerializer
getValueSerializer() { return valueSerializer; } @Override public void setCurrentNamespace(N namespace) { original.setCurrentNamespace(namespace); } @Override public byte[] getSerializedValue( byte[] serializedKeyAndNamespace, TypeSerializer
safeKeySerializer, TypeSerializer
safeNamespaceSerializer, TypeSerializer
safeValueSerializer) { throw new FlinkRuntimeException("Queryable state is not currently supported with TTL."); } @Override public void clear() { original.clear(); }}复制代码
  • AbstractTtlState实现了InternalKvState接口的方法,同时继承了AbstractTtlDecorator;它提供了getWithTtlCheckAndUpdate方法,该方法主要是调用AbstractTtlDecorator的getWithTtlCheckAndUpdate来实现TTL逻辑

AbstractTtlDecorator

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java

/** * Base class for TTL logic wrappers. * * @param 
Type of originally wrapped object */abstract class AbstractTtlDecorator
{ /** Wrapped original state handler. */ final T original; final StateTtlConfig config; final TtlTimeProvider timeProvider; /** Whether to renew expiration timestamp on state read access. */ final boolean updateTsOnRead; /** Whether to renew expiration timestamp on state read access. */ final boolean returnExpired; /** State value time to live in milliseconds. */ final long ttl; AbstractTtlDecorator( T original, StateTtlConfig config, TtlTimeProvider timeProvider) { Preconditions.checkNotNull(original); Preconditions.checkNotNull(config); Preconditions.checkNotNull(timeProvider); this.original = original; this.config = config; this.timeProvider = timeProvider; this.updateTsOnRead = config.getUpdateType() == StateTtlConfig.UpdateType.OnReadAndWrite; this.returnExpired = config.getStateVisibility() == StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp; this.ttl = config.getTtl().toMilliseconds(); }
V getUnexpired(TtlValue
ttlValue) { return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); }
boolean expired(TtlValue
ttlValue) { return TtlUtils.expired(ttlValue, ttl, timeProvider); }
TtlValue
wrapWithTs(V value) { return TtlUtils.wrapWithTs(value, timeProvider.currentTimestamp()); }
TtlValue
rewrapWithNewTs(TtlValue
ttlValue) { return wrapWithTs(ttlValue.getUserValue()); }
V getWithTtlCheckAndUpdate( SupplierWithException
, SE> getter, ThrowingConsumer
, CE> updater, ThrowingRunnable
stateClear) throws SE, CE, CLE { TtlValue
ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear); return ttlValue == null ? null : ttlValue.getUserValue(); }
TtlValue
getWrappedWithTtlCheckAndUpdate( SupplierWithException
, SE> getter, ThrowingConsumer
, CE> updater, ThrowingRunnable
stateClear) throws SE, CE, CLE { TtlValue
ttlValue = getter.get(); if (ttlValue == null) { return null; } else if (expired(ttlValue)) { stateClear.run(); if (!returnExpired) { return null; } } else if (updateTsOnRead) { updater.accept(rewrapWithNewTs(ttlValue)); } return ttlValue; }}复制代码
  • AbstractTtlDecorator对TTL逻辑进行了封装,其主要的逻辑在getWrappedWithTtlCheckAndUpdate方法,它在每次访问的时候对于非null的value会先判断下是否expired(TtlUtils.expired(ttlValue, ttl, timeProvider)),如果过期了则调用stateClear(ThrowingRunnable类型,这里是original::clear),对于非returnExpired的则直接返回null;对于没有expired的,则判断是否updateTsOnRead,若是则调用updater进行处理,最后返回ttlValue

TtlUtils.expired

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlUtils.java

/** Common functions related to State TTL. */class TtlUtils {	static 
boolean expired(@Nullable TtlValue
ttlValue, long ttl, TtlTimeProvider timeProvider) { return expired(ttlValue, ttl, timeProvider.currentTimestamp()); } static
boolean expired(@Nullable TtlValue
ttlValue, long ttl, long currentTimestamp) { return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp); } private static boolean expired(long ts, long ttl, long currentTimestamp) { return getExpirationTimestamp(ts, ttl) <= currentTimestamp; } private static long getExpirationTimestamp(long ts, long ttl) { long ttlWithoutOverflow = ts > 0 ? Math.min(Long.MAX_VALUE - ts, ttl) : ttl; return ts + ttlWithoutOverflow; } //......}复制代码
  • TtlUtils的expired方法主要是通过getExpirationTimestamp获取过期时间,然后跟currentTimestamp进行比较;而getExpirationTimestamp这里是根据ttlValue.getLastAccessTimestamp()及ttl值进行判断,这里利用Long.MAX_VALUE处理了overflow的情况,防止最后的值超出long类型的最大范围

ThrowingRunnable

flink-core-1.7.0-sources.jar!/org/apache/flink/util/function/ThrowingRunnable.java

/** * Similar to a {@link Runnable}, this interface is used to capture a block of code * to be executed. In contrast to {@code Runnable}, this interface allows throwing * checked exceptions. */@PublicEvolving@FunctionalInterfacepublic interface ThrowingRunnable
{ /** * The work method. * * @throws E Exceptions may be thrown. */ void run() throws E; /** * Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked exceptions * as unchecked. * * @param throwingRunnable to convert into a {@link Runnable} * @return {@link Runnable} which throws all checked exceptions as unchecked. */ static Runnable unchecked(ThrowingRunnable
throwingRunnable) { return () -> { try { throwingRunnable.run(); } catch (Throwable t) { ExceptionUtils.rethrow(t); } }; }}复制代码
  • stateClear是ThrowingRunnable类型,它与Runnable不同,ThrowingRunnable允许抛出checked exceptions,它提供了一个unchecked的静态方法,用于将非Error及非RuntimeException的转为RuntimeException抛出来,从而将ThrowingRunnable转换为Runnable

小结

  • InternalKvState接口定义内部的kvState要实现的方法,这里主要是getKeySerializer、getNamespaceSerializer、getValueSerializer、setCurrentNamespace、getSerializedValue
  • AbstractTtlState实现了InternalKvState接口的方法,同时继承了AbstractTtlDecorator;它提供了getWithTtlCheckAndUpdate方法,该方法主要是调用AbstractTtlDecorator的getWithTtlCheckAndUpdate来实现TTL逻辑
  • AbstractTtlDecorator的getWrappedWithTtlCheckAndUpdate方法,在每次访问的时候对于非null的value会先判断下是否expired(TtlUtils.expired(ttlValue, ttl, timeProvider)),如果过期了则调用stateClear(ThrowingRunnable类型,这里是original::clear),对于非returnExpired的则直接返回null;对于没有expired的,则判断是否updateTsOnRead,若是则调用updater进行处理,最后返回ttlValue

doc

转载地址:http://whcaa.baihongyu.com/

你可能感兴趣的文章
关于ST-Link的internal command error问题的解决方法
查看>>
[IDE]VC2012 项目之间依赖关系取消自动Link导致的LNK2019
查看>>
IDEA 修改JSP和后端数据后,页面刷新可以实时更新
查看>>
IT兄弟连 JavaWeb教程 Servlet会话跟踪 Cookie路径问题
查看>>
synchronized(this)(转)
查看>>
类别标签处理
查看>>
深度|余凯:基于深度学习的自动驾驶之路
查看>>
ORA-00845: MEMORY_TARGET not supported on this system
查看>>
数据库存储结构
查看>>
国内银行CNAPS CODE 查询 苹果开发者,应用内购,需要填写税务相关信息必须的...
查看>>
Linux下抓图工具shutter
查看>>
javascript获取select,checkbox,radio的值
查看>>
Metro Win8风格的按钮(Filp翻转)
查看>>
cookies/session/token
查看>>
清除代码异味
查看>>
【转】从知乎上看到“全栈开发者”讨论之后的自黑
查看>>
Java-IO流
查看>>
Linux入门-6 Linux网络基本配置
查看>>
洗礼灵魂,修炼python(22)--自定义函数(3)—函数作用域,闭包
查看>>
newcoder Tachibana Kanade Loves Probability(小数点后第k位)题解
查看>>