Flink: SQL: Make Dynamic sink options to be configurable in SQL#15780
Flink: SQL: Make Dynamic sink options to be configurable in SQL#15780swapna267 wants to merge 4 commits intoapache:mainfrom
Conversation
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.flink; |
There was a problem hiding this comment.
Should probably be in the dynamic package. Or should we create a config package?
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.flink; |
| FlinkDynamicSinkConf flinkDynamicSinkConf = | ||
| new FlinkDynamicSinkConf(writeProperties, flinkConfig); |
There was a problem hiding this comment.
Can we directly pass FlinkDynamicSinkConf to the constructor?
| writeOptions.put( | ||
| FlinkDynamicSinkOptions.IMMEDIATE_TABLE_UPDATE.key(), | ||
| Boolean.toString(newImmediateUpdate)); |
There was a problem hiding this comment.
I'm not sure this should go into WriteOptions. I think it is better to have a separate config for DynamicSink options.
There was a problem hiding this comment.
With all of them written into WriteOptions, it is easier to pass these configs from SQL by using setAll(Map<String, String> properties) for DynamicIcebergSink initialization.
If we separate them, we either need to handle it in setAll or upstream users need to provide them separately.
As Dynamic sink configs are scoped with prefix dynamic-sink , should be ok to go in same map ?
There was a problem hiding this comment.
Thinking about it again, it's probably ok.
| FlinkDynamicSinkConf flinkDynamicSinkConf = | ||
| new FlinkDynamicSinkConf(writeProperties, flinkConfig); |
There was a problem hiding this comment.
Could we create the config only once and pass it to the constructor?
There was a problem hiding this comment.
Did and removed for consistency.
DynamicRecordProcessor needs FlinkDynamicSinkConf and also writeProperties/flinkConfig.
WriteProperties and FlinkConfig are required to create FlinkWriteConf in Open as it's not serializable.
May be, I can simply pass FlinkDynamicSinkConf also along with writeProperties/flinkConfig.
| if (super.writeParallelism() != Integer.MAX_VALUE) { | ||
| return super.writeParallelism(); | ||
| } |
There was a problem hiding this comment.
Not sure about this logic. The default for writeParallelism is 0.
There was a problem hiding this comment.
DynamicRecord constructor has writeParallelism as primitive int. And we basically are using Integer.MAX_VALUE to fall back to other value like job parallelism.
* @param writeParallelism The number of parallel writers. Can be set to any value {@literal > 0},
* but will always be automatically capped by the maximum write parallelism, which is the
* parallelism of the sink. Set to Integer.MAX_VALUE for always using the maximum available
* write parallelism.
There was a problem hiding this comment.
we have similar issue with upsertMode as it's using primitive boolean.
There was a problem hiding this comment.
True, the user supplies this value via the constructor. I initially thought that we were calling the non-existing empty constructor.
Regardless, I'm not sure about this change. The docs state, that Integer.MAX_VALUE means to use the max write parallelism, but we cap it on the writeParallelism supplied by the FlinkWriteConf, that does not seem right.
I think the only way to address this is to use a value like 0 to trigger loading from FlinkWriteConf.
There was a problem hiding this comment.
If not overridden in FlinkWriteConf we still use Integer.MAX_VALUE .
But I agree on falling back to config, when explicitly set to <=0
| * have fields set. | ||
| */ | ||
| @Internal | ||
| class DynamicRecordWithDefaults extends DynamicRecord { |
There was a problem hiding this comment.
| class DynamicRecordWithDefaults extends DynamicRecord { | |
| class DynamicRecordWithConfig extends DynamicRecord { |
Should we add a test to verify config handling?
| if (super.writeParallelism() != Integer.MAX_VALUE) { | ||
| return super.writeParallelism(); | ||
| } |
There was a problem hiding this comment.
True, the user supplies this value via the constructor. I initially thought that we were calling the non-existing empty constructor.
Regardless, I'm not sure about this change. The docs state, that Integer.MAX_VALUE means to use the max write parallelism, but we cap it on the writeParallelism supplied by the FlinkWriteConf, that does not seem right.
I think the only way to address this is to use a value like 0 to trigger loading from FlinkWriteConf.
| super( | ||
| source.tableIdentifier(), | ||
| source.branch(), | ||
| source.schema(), | ||
| source.rowData(), | ||
| source.spec(), | ||
| source.distributionMode(), | ||
| source.writeParallelism()); |
There was a problem hiding this comment.
Could we just resolve the adjusted arguments here? We wouldn't have to overwrite all the methods then. Seems cleaner to me.
| @Override | ||
| public void collect(DynamicRecord data) { | ||
| public void collect(DynamicRecord inputData) { | ||
| DynamicRecordWithDefaults data = new DynamicRecordWithDefaults(inputData, flinkWriteConf); |
There was a problem hiding this comment.
I don't like that we are creating another copy DynamicRecord here (DynamicRecordWithDefaults). Only a couple lines later, we also create DynamicRecordInternal. Could we simply resolve the parts of DynamicRecord which can be overridden? Only a subset of the values are actually overridable.
There was a problem hiding this comment.
I started that way, but felt that approach was little error prone for future , as configs of DynamicRecord need to be overridden at different places for different usages.
Write Parallelism / Distribution mode overrides need to be passed over to HashKeyGenerator.
And overrides like branch/upsertMode should be passed to DynamicRecordInternal.
So the approach would look like,
- Build HashKeyGenerator with defaults for Write Parallelism / Distribution mode .
- Update all DynamicRecordInternal construction paths for defaults for Branch.
Or another option could be to incorporate this into DynamicRecord itself by having setConfig(FlinkWriteConf flinkWriteConf)
| writeOptions.put( | ||
| FlinkDynamicSinkOptions.IMMEDIATE_TABLE_UPDATE.key(), | ||
| Boolean.toString(newImmediateUpdate)); |
There was a problem hiding this comment.
Thinking about it again, it's probably ok.
|
@swapna267 I would like to get rid of |
Support following configs to be configurable from SQL for dynamic sink.
Fallback to writeproperties or Flink configuration if following are not set on DynamicRecord,
writeParallelism(int) → FlinkWriteOptions.WRITE_PARALLELISM
distributionMode -> FlinkWriteOptions.DISTRIBUTION_MODE
toBranch(String) → FlinkWriteOptions.BRANCH
Provide options to configure following behavior of Dynamic Sink in SQl
cacheMaxSize(int)
immediateTableUpdate(boolean)
dropUnusedColumns(boolean)
cacheRefreshMs(long)
inputSchemasPerTableCacheMaxSize(int)
caseSensitive(boolean)
More context here, #15471 (comment)