View Issue Details
ID | Project | Category | View Status | Date Submitted | Last Update |
---|---|---|---|---|---|
0005455 | SymmetricDS | Bug | public | 2022-09-13 07:39 | 2025-01-10 05:43 |
Reporter | qii | Assigned To | elong | ||
Priority | normal | ||||
Status | assigned | Resolution | open | ||
Product Version | 3.14.1 | ||||
Summary | 0005455: get trigger bug, first delete and then insert data with clob,its not sync | ||||
Description | i test version 3.14 first delete and then insert data clob data did not sync, i guess its same reason in 3.12 when insert it use delete trigger :no_binary_old_data :false use database: oracle 11g; table has clob column, first delete and then insert , data clob insert '\b' not true value. | ||||
Steps To Reproduce | table has clob column, first delete and then insert , data clob insert '\b' not true value. | ||||
Tags | large objects, trigger | ||||
|
Do you mean LONG data type? Oracle triggers cannot access a LONG data type using the "NEW" correlation name. (Documented here: https://docs.oracle.com/cd/E11882_01/appdev.112/e25519/triggers.htm#LNPLS758 ) Because of that restriction, SymmetricDS records a special escape character of "\b" that will cause the LONG data to be queried on extraction of the batch. If you inspect the batch data from the staging on disk, you will see the actual LONG data there. (Staging is under the "tmp" sub-directory and the file has the batch number in it.) So, it is working as designed to deal with Oracle's restrictions. If you need to see the data captured in sym_data for some reason, then use CLOB instead. |
|
I know this, first delete and then insert data with clob ,in 'tmp' directory, the value in Long data type is '\b', not real data. beacuse when insert ,its no_binary_old_data :false,not true. use sym_trigger set use_capture_lobs = 1, it work . but i think even use sym_trigger set use_capture_lobs = 0,first delete and then insert data with clob,it will be true data to sync. you can not use delete trigger config to insert value. |
|
Can you give steps to reproduce it? Here is what I tried it's working correctly. CREATE TABLE MYCLOB( ID NUMBER(38) NOT NULL PRIMARY KEY, DESCRIPTION CLOB ); insert into myclob values (1, 'hello, hi'); vi Test.java import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class Test { public static void main(String[] args) throws Exception { int count = 0; Class.forName("oracle.jdbc.driver.OracleDriver"); Connection c = DriverManager.getConnection("jdbc:oracle:thin:@127.0.0.1:1521:xe", "elong", "secret"); PreparedStatement s = c.prepareStatement("insert into myclob values (?, ?)"); s.setInt(1, 2); StringBuilder sb = new StringBuilder(); for (int i = 0; i < 4100; i++) { sb.append("a"); } s.setString(2, sb.toString()); count = s.executeUpdate(); System.out.println("Updated " + count + " rows"); s.close(); c.close(); } } javac Test.java java Test Updated 1 rows select row_data from sym_data where table_name = 'MYCLOB'; "1","hello, hi" "2","aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa[...repeats to 4100 characters...]" |
|
before insert ,you should delete ; public static void main(String[] args) throws Exception { int count = 0; Class.forName("oracle.jdbc.driver.OracleDriver"); Connection c = DriverManager.getConnection("jdbc:oracle:thin:@127.0.0.1:1521:xe", "elong", "secret"); PreparedStatement s = c.prepareStatement("delete myclob where id = 1 "); PreparedStatement s = c.prepareStatement("insert into myclob values (?, ?)"); s.setInt(1, 2); StringBuilder sb = new StringBuilder(); for (int i = 0; i < 4100; i++) { sb.append("a"); } s.setString(2, sb.toString()); count = s.executeUpdate(); System.out.println("Updated " + count + " rows"); s.close(); c.close(); } } and the clob value is '\b'. |
|
We're unable to reproduce this issue, and the code that you provided does not compile. Can you provide the exact steps required to reproduce this issue, including the table definition? |
|
in 3.12 here is my change : line 2465 line 2626-2641 line 2765 trigger has cache in this method,first delete the trigger think this data not has long field. DataExtractorService.java (174,458 bytes)
/** * Licensed to JumpMind Inc under one or more contributor * license agreements. See the NOTICE file distributed * with this work for additional information regarding * copyright ownership. JumpMind Inc licenses this file * to you under the GNU General Public License, version 3.0 (GPLv3) * (the "License"); you may not use this file except in compliance * with the License. * * You should have received a copy of the GNU General Public License, * version 3.0 (GPLv3) along with this library; if not, see * <http://www.gnu.org/licenses/>. * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.jumpmind.symmetric.service.impl; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.jumpmind.symmetric.common.Constants.LOG_PROCESS_SUMMARY_THRESHOLD; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStream; import java.io.Writer; import java.math.BigDecimal; import java.sql.SQLException; import java.sql.Types; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.zip.ZipException; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DurationFormatUtils; import org.jumpmind.db.io.DatabaseXmlUtil; import org.jumpmind.db.model.Column; import org.jumpmind.db.model.Database; import org.jumpmind.db.model.ForeignKey; import org.jumpmind.db.model.PlatformColumn; import org.jumpmind.db.model.Reference; import org.jumpmind.db.model.Table; import org.jumpmind.db.platform.DatabaseInfo; import org.jumpmind.db.platform.DatabaseNamesConstants; import org.jumpmind.db.platform.DdlBuilderFactory; import org.jumpmind.db.platform.IDdlBuilder; import org.jumpmind.db.sql.DmlStatement; import org.jumpmind.db.sql.DmlStatement.DmlType; import org.jumpmind.db.sql.ISqlReadCursor; import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.Row; import org.jumpmind.db.sql.mapper.LongMapper; import org.jumpmind.exception.IoException; import org.jumpmind.symmetric.AbstractSymmetricEngine; import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.SymmetricException; import org.jumpmind.symmetric.Version; import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ErrorConstants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.common.TableConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.io.data.Batch; import org.jumpmind.symmetric.io.data.Batch.BatchType; import org.jumpmind.symmetric.io.data.CsvConstants; import org.jumpmind.symmetric.io.data.CsvData; import org.jumpmind.symmetric.io.data.CsvUtils; import org.jumpmind.symmetric.io.data.DataContext; import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.io.data.DataProcessor; import org.jumpmind.symmetric.io.data.IDataProcessorListener; import org.jumpmind.symmetric.io.data.IDataReader; import org.jumpmind.symmetric.io.data.IDataWriter; import org.jumpmind.symmetric.io.data.ProtocolException; import org.jumpmind.symmetric.io.data.reader.DataReaderStatistics; import org.jumpmind.symmetric.io.data.reader.ExtractDataReader; import org.jumpmind.symmetric.io.data.reader.IExtractDataReaderSource; import org.jumpmind.symmetric.io.data.reader.ProtocolDataReader; import org.jumpmind.symmetric.io.data.transform.TransformPoint; import org.jumpmind.symmetric.io.data.transform.TransformTable; import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants; import org.jumpmind.symmetric.io.data.writer.ProtocolDataWriter; import org.jumpmind.symmetric.io.data.writer.StagingDataWriter; import org.jumpmind.symmetric.io.data.writer.StructureDataWriter; import org.jumpmind.symmetric.io.data.writer.StructureDataWriter.PayloadType; import org.jumpmind.symmetric.io.data.writer.TransformWriter; import org.jumpmind.symmetric.io.stage.IStagedResource; import org.jumpmind.symmetric.io.stage.IStagedResource.State; import org.jumpmind.symmetric.io.stage.IStagingManager; import org.jumpmind.symmetric.io.stage.StagingFileLock; import org.jumpmind.symmetric.io.stage.StagingLowFreeSpace; import org.jumpmind.symmetric.load.IReloadVariableFilter; import org.jumpmind.symmetric.model.AbstractBatch.Status; import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.ChannelMap; import org.jumpmind.symmetric.model.Data; import org.jumpmind.symmetric.model.DataMetaData; import org.jumpmind.symmetric.model.ExtractRequest; import org.jumpmind.symmetric.model.ExtractRequest.ExtractStatus; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.NodeChannel; import org.jumpmind.symmetric.model.NodeCommunication; import org.jumpmind.symmetric.model.NodeCommunication.CommunicationType; import org.jumpmind.symmetric.model.NodeGroupLink; import org.jumpmind.symmetric.model.NodeGroupLinkAction; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.OutgoingBatchWithPayload; import org.jumpmind.symmetric.model.OutgoingBatches; import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus; import org.jumpmind.symmetric.model.ProcessInfoDataWriter; import org.jumpmind.symmetric.model.ProcessInfoKey; import org.jumpmind.symmetric.model.ProcessType; import org.jumpmind.symmetric.model.RemoteNodeStatus; import org.jumpmind.symmetric.model.RemoteNodeStatuses; import org.jumpmind.symmetric.model.Router; import org.jumpmind.symmetric.model.TableReloadRequest; import org.jumpmind.symmetric.model.TableReloadStatus; import org.jumpmind.symmetric.model.Trigger; import org.jumpmind.symmetric.model.TriggerHistory; import org.jumpmind.symmetric.model.TriggerRouter; import org.jumpmind.symmetric.route.AbstractFileParsingRouter; import org.jumpmind.symmetric.route.IDataRouter; import org.jumpmind.symmetric.route.SimpleRouterContext; import org.jumpmind.symmetric.service.ClusterConstants; import org.jumpmind.symmetric.service.IClusterService; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.IDataExtractorService; import org.jumpmind.symmetric.service.IDataService; import org.jumpmind.symmetric.service.IExtensionService; import org.jumpmind.symmetric.service.IInitialLoadService; import org.jumpmind.symmetric.service.INodeCommunicationService; import org.jumpmind.symmetric.service.INodeCommunicationService.INodeCommunicationExecutor; import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IOutgoingBatchService; import org.jumpmind.symmetric.service.IRouterService; import org.jumpmind.symmetric.service.ISequenceService; import org.jumpmind.symmetric.service.ITransformService; import org.jumpmind.symmetric.service.ITriggerRouterService; import org.jumpmind.symmetric.service.impl.TransformService.TransformTableNodeGroupLink; import org.jumpmind.symmetric.statistic.IStatisticManager; import org.jumpmind.symmetric.transport.BatchBufferedWriter; import org.jumpmind.symmetric.transport.IOutgoingTransport; import org.jumpmind.symmetric.transport.TransportUtils; import org.jumpmind.symmetric.util.CounterStat; import org.jumpmind.symmetric.util.SymmetricUtils; import org.jumpmind.util.AppUtils; import org.jumpmind.util.CustomizableThreadFactory; import org.jumpmind.util.ExceptionUtils; import org.jumpmind.util.FormatUtils; import org.jumpmind.util.FutureImpl; import org.jumpmind.util.Statistics; import org.slf4j.MDC; /** * @see IDataExtractorService */ public class DataExtractorService extends AbstractService implements IDataExtractorService, INodeCommunicationExecutor { final static long MS_PASSED_BEFORE_BATCH_REQUERIED = 5000; protected enum ExtractMode { FOR_SYM_CLIENT, FOR_PAYLOAD_CLIENT, EXTRACT_ONLY }; private ISymmetricEngine engine; IOutgoingBatchService outgoingBatchService; private IRouterService routerService; private IInitialLoadService initialLoadService; private IConfigurationService configurationService; private ITriggerRouterService triggerRouterService; private ITransformService transformService; private ISequenceService sequenceService; private IDataService dataService; private INodeService nodeService; IStatisticManager statisticManager; private IStagingManager stagingManager; private INodeCommunicationService nodeCommunicationService; private IClusterService clusterService; private IExtensionService extensionService; private Map<String, BatchLock> locks = new ConcurrentHashMap<String, BatchLock>(); private CustomizableThreadFactory threadPoolFactory; public DataExtractorService(ISymmetricEngine engine) { super(engine.getParameterService(), engine.getSymmetricDialect()); this.engine = engine; this.outgoingBatchService = engine.getOutgoingBatchService(); this.routerService = engine.getRouterService(); this.dataService = engine.getDataService(); this.configurationService = engine.getConfigurationService(); this.triggerRouterService = engine.getTriggerRouterService(); this.nodeService = engine.getNodeService(); this.transformService = engine.getTransformService(); this.statisticManager = engine.getStatisticManager(); this.stagingManager = engine.getStagingManager(); this.nodeCommunicationService = engine.getNodeCommunicationService(); this.clusterService = engine.getClusterService(); this.sequenceService = engine.getSequenceService(); this.extensionService = engine.getExtensionService(); this.initialLoadService = engine.getInitialLoadService(); setSqlMap(new DataExtractorServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens())); } /** * @see DataExtractorService#extractConfigurationStandalone(Node, Writer) */ public void extractConfigurationStandalone(Node node, OutputStream out) { this.extractConfigurationStandalone(node, TransportUtils.toWriter(out), TableConstants.SYM_MONITOR_EVENT, TableConstants.SYM_CONSOLE_EVENT); } public void extractConfigurationOnly(Node node, OutputStream out) { this.extractConfigurationStandalone(node, TransportUtils.toWriter(out), TableConstants.SYM_NODE, TableConstants.SYM_NODE_SECURITY, TableConstants.SYM_NODE_IDENTITY, TableConstants.SYM_NODE_HOST, TableConstants.SYM_FILE_SNAPSHOT, TableConstants.SYM_NODE_CHANNEL_CTL,TableConstants.SYM_CONSOLE_ROLE, TableConstants.SYM_CONSOLE_USER, TableConstants.SYM_CONSOLE_ROLE_PRIVILEGE, TableConstants.SYM_TABLE_RELOAD_REQUEST, TableConstants.SYM_MONITOR_EVENT, TableConstants.SYM_CONSOLE_EVENT); } protected boolean filter(Node targetNode, String tableName) { boolean pre37 = Version.isOlderThanVersion(targetNode.getSymmetricVersionParts(), Version.VERSION_3_7_0); boolean pre38 = Version.isOlderThanVersion(targetNode.getSymmetricVersionParts(), Version.VERSION_3_8_0); boolean pre3818 = Version.isOlderThanVersion(targetNode.getSymmetricVersionParts(), Version.VERSION_3_8_18); boolean pre39 = Version.isOlderThanVersion(targetNode.getSymmetricVersionParts(), Version.VERSION_3_9_0); boolean pre311 = Version.isOlderThanVersion(targetNode.getSymmetricVersionParts(), Version.VERSION_3_11_0); boolean pre312 = Version.isOlderThanVersion(targetNode.getSymmetricVersionParts(), Version.VERSION_3_12_0); tableName = tableName.toLowerCase(); boolean include = true; if (pre39 && (tableName.contains(TableConstants.SYM_JOB) || tableName.contains(TableConstants.SYM_CONSOLE_ROLE) || tableName.contains(TableConstants.SYM_CONSOLE_ROLE_PRIVILEGE))) { include = false; } else if (pre37 && tableName.contains(TableConstants.SYM_EXTENSION)) { include = false; } else if (pre38 && (tableName.contains(TableConstants.SYM_MONITOR) || tableName.contains(TableConstants.SYM_NOTIFICATION))) { include = false; } else if (pre3818 && tableName.contains(TableConstants.SYM_CONSOLE_USER_HIST)) { include = false; } else if (pre311 && (tableName.contains(TableConstants.SYM_CONSOLE_ROLE) || tableName.contains(TableConstants.SYM_CONSOLE_ROLE_PRIVILEGE))) { include = false; } else if (pre312 && (tableName.contains(TableConstants.SYM_DESIGN_DIAGRAM) || tableName.contains(TableConstants.SYM_DIAGRAM_GROUP))) { include = false; } else if (tableName.contains(TableConstants.SYM_CONSOLE_USER) || tableName.contains(TableConstants.SYM_CONSOLE_USER_HIST) || tableName.contains(TableConstants.SYM_CONSOLE_EVENT) || tableName.contains(TableConstants.SYM_CONSOLE_ROLE) || tableName.contains(TableConstants.SYM_CONSOLE_ROLE_PRIVILEGE) || tableName.contains(TableConstants.SYM_DESIGN_DIAGRAM) || tableName.contains(TableConstants.SYM_DIAGRAM_GROUP)) { boolean isTargetProfessional = StringUtils.equals(targetNode.getDeploymentType(), Constants.DEPLOYMENT_TYPE_PROFESSIONAL); if (!isTargetProfessional) { include = false; } } return include; } /** * Extract the SymmetricDS configuration for the passed in {@link Node}. */ public void extractConfigurationStandalone(Node targetNode, Writer writer, String... tablesToExclude) { Node sourceNode = nodeService.findIdentity(); if (targetNode != null && sourceNode != null) { Batch batch = new Batch(BatchType.EXTRACT, Constants.VIRTUAL_BATCH_FOR_REGISTRATION, Constants.CHANNEL_CONFIG, symmetricDialect.getBinaryEncoding(), sourceNode.getNodeId(), targetNode.getNodeId(), false); NodeGroupLink nodeGroupLink = new NodeGroupLink(parameterService.getNodeGroupId(), targetNode.getNodeGroupId()); List<TriggerRouter> triggerRouters = triggerRouterService .buildTriggerRoutersForSymmetricTables( StringUtils.isBlank(targetNode.getSymmetricVersion()) ? Version .version() : targetNode.getSymmetricVersion(), nodeGroupLink, tablesToExclude); List<SelectFromTableEvent> initialLoadEvents = new ArrayList<SelectFromTableEvent>( triggerRouters.size() * 2); for (int i = triggerRouters.size() - 1; i >= 0; i--) { TriggerRouter triggerRouter = triggerRouters.get(i); String channelId = triggerRouter.getTrigger().getChannelId(); if (Constants.CHANNEL_CONFIG.equals(channelId) || Constants.CHANNEL_HEARTBEAT.equals(channelId)) { if (filter(targetNode, triggerRouter.getTrigger().getSourceTableName())) { TriggerHistory triggerHistory = triggerRouterService .getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger() .getTriggerId(), null, null, triggerRouter.getTrigger() .getSourceTableName()); if (triggerHistory == null) { Trigger trigger = triggerRouter.getTrigger(); Table table = symmetricDialect.getPlatform().getTableFromCache( trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), trigger.getSourceTableName(), false); if (table == null) { throw new IllegalStateException("Could not find a required table: " + triggerRouter.getTrigger().getSourceTableName()); } triggerHistory = new TriggerHistory(table, triggerRouter.getTrigger(), symmetricDialect.getTriggerTemplate()); triggerHistory.setTriggerHistoryId(Integer.MAX_VALUE - i); } StringBuilder sql = new StringBuilder(symmetricDialect.createPurgeSqlFor( targetNode, triggerRouter, triggerHistory)); addPurgeCriteriaToConfigurationTables(triggerRouter.getTrigger() .getSourceTableName(), sql); String sourceTable = triggerHistory.getSourceTableName(); Data data = new Data(1, null, sql.toString(), DataEventType.SQL, sourceTable, null, triggerHistory, triggerRouter.getTrigger() .getChannelId(), null, null); initialLoadEvents.add(new SelectFromTableEvent(data, triggerRouter)); } } } for (int i = 0; i < triggerRouters.size(); i++) { TriggerRouter triggerRouter = triggerRouters.get(i); String channelId = triggerRouter.getTrigger().getChannelId(); if (Constants.CHANNEL_CONFIG.equals(channelId) || Constants.CHANNEL_HEARTBEAT.equals(channelId)) { if (filter(targetNode, triggerRouter.getTrigger().getSourceTableName())) { TriggerHistory triggerHistory = triggerRouterService .getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger() .getTriggerId(), null, null, null); if (triggerHistory == null) { Trigger trigger = triggerRouter.getTrigger(); triggerHistory = new TriggerHistory(symmetricDialect.getPlatform() .getTableFromCache(trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), trigger.getSourceTableName(), false), trigger, symmetricDialect.getTriggerTemplate()); triggerHistory.setTriggerHistoryId(Integer.MAX_VALUE - i); } Table table = symmetricDialect.getPlatform().getTableFromCache( triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(), triggerHistory.getSourceTableName(), false); String initialLoadSql = "1=1 order by "; String quote = symmetricDialect.getPlatform().getDdlBuilder().getDatabaseInfo().getDelimiterToken(); Column[] pkColumns = table.getPrimaryKeyColumns(); for (int j = 0; j < pkColumns.length; j++) { if (j > 0) { initialLoadSql += ", "; } initialLoadSql += quote + pkColumns[j].getName() + quote; } if (!triggerRouter.getTrigger().getSourceTableName() .endsWith(TableConstants.SYM_NODE_IDENTITY)) { initialLoadEvents.add(new SelectFromTableEvent(targetNode, triggerRouter, triggerHistory, initialLoadSql)); } else { Data data = new Data(1, null, targetNode.getNodeId(), DataEventType.INSERT, triggerHistory.getSourceTableName(), null, triggerHistory, triggerRouter.getTrigger().getChannelId(), null, null); initialLoadEvents.add(new SelectFromTableEvent(data, triggerRouter)); } } } } SelectFromTableSource source = new SelectFromTableSource(batch, initialLoadEvents); source.setConfiguration(true); ExtractDataReader dataReader = new ExtractDataReader( this.symmetricDialect.getPlatform(), source); ProtocolDataWriter dataWriter = new ProtocolDataWriter( nodeService.findIdentityNodeId(), writer, targetNode.requires13Compatiblity(), false, false); List<TransformTableNodeGroupLink> transformsList = transformService.getConfigExtractTransforms(nodeGroupLink); TransformTable[] transforms = transformsList.toArray(new TransformTable[transformsList.size()]); TransformWriter transformWriter = new TransformWriter(symmetricDialect.getTargetPlatform(), TransformPoint.EXTRACT, dataWriter, transformService.getColumnTransforms(), transforms); DataContext ctx = new DataContext(); DataProcessor processor = new DataProcessor(dataReader, transformWriter, "configuration extract"); ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode); ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, sourceNode); ctx.put(Constants.DATA_CONTEXT_ENGINE, engine); processor.process(ctx); if (triggerRouters.size() == 0) { log.error("{} attempted registration, but was sent an empty configuration", targetNode); } } } private void addPurgeCriteriaToConfigurationTables(String sourceTableName, StringBuilder sql) { if ((TableConstants .getTableName(parameterService.getTablePrefix(), TableConstants.SYM_NODE) .equalsIgnoreCase(sourceTableName)) || TableConstants.getTableName(parameterService.getTablePrefix(), TableConstants.SYM_NODE_SECURITY).equalsIgnoreCase(sourceTableName)) { Node me = nodeService.findIdentity(); if (me != null) { sql.append(String.format(" where created_at_node_id='%s'", me.getNodeId())); } } } private List<OutgoingBatch> filterBatchesForExtraction(OutgoingBatches batches, ChannelMap suspendIgnoreChannelsList) { if (parameterService.is(ParameterConstants.FILE_SYNC_ENABLE)) { List<Channel> fileSyncChannels = configurationService.getFileSyncChannels(); for (Channel channel : fileSyncChannels) { batches.filterBatchesForChannel(channel); } } // We now have either our local suspend/ignore list, or the combined // remote send/ignore list and our local list (along with a // reservation, if we go this far...) // Now, we need to skip the suspended channels and ignore the // ignored ones by ultimately setting the status to ignored and // updating them. List<OutgoingBatch> ignoredBatches = batches .filterBatchesForChannels(suspendIgnoreChannelsList.getIgnoreChannels()); // Finally, update the ignored outgoing batches such that they // will be skipped in the future. for (OutgoingBatch batch : ignoredBatches) { batch.setStatus(OutgoingBatch.Status.OK); batch.incrementIgnoreCount(); if (log.isDebugEnabled()) { log.debug("Batch {} is being ignored", batch.getBatchId()); } } outgoingBatchService.updateOutgoingBatches(ignoredBatches); batches.filterBatchesForChannels(suspendIgnoreChannelsList.getSuspendChannels()); // Remove non-load batches so that an initial load finishes before // any other batches are loaded. if (parameterService.is(ParameterConstants.INITIAL_LOAD_BLOCK_CHANNELS, true)) { if (batches.containsLoadBatches() && !(parameterService.is(ParameterConstants.INITIAL_LOAD_UNBLOCK_CHANNELS_ON_ERROR, true) && batches.containsBatchesInError())) { batches.removeNonLoadBatches(); } } return batches.getBatches(); } public List<OutgoingBatchWithPayload> extractToPayload(ProcessInfo processInfo, Node targetNode, PayloadType payloadType, boolean useJdbcTimestampFormat, boolean useUpsertStatements, boolean useDelimiterIdentifiers) { OutgoingBatches batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), false); if (batches.containsBatches()) { ChannelMap channelMap = configurationService.getSuspendIgnoreChannelLists(targetNode .getNodeId()); List<OutgoingBatch> activeBatches = filterBatchesForExtraction(batches, channelMap); if (activeBatches.size() > 0) { IDdlBuilder builder = DdlBuilderFactory.createDdlBuilder(targetNode .getDatabaseType()); if (builder == null) { throw new IllegalStateException( "Could not find a ddl builder registered for the database type of " + targetNode.getDatabaseType() + ". Please check the database type setting for node '" + targetNode.getNodeId() + "'"); } StructureDataWriter writer = new StructureDataWriter( symmetricDialect.getPlatform(), targetNode.getDatabaseType(), payloadType, useDelimiterIdentifiers, symmetricDialect.getBinaryEncoding(), useJdbcTimestampFormat, useUpsertStatements); List<OutgoingBatch> extractedBatches = extract(processInfo, targetNode, activeBatches, writer, null, ExtractMode.FOR_PAYLOAD_CLIENT); List<OutgoingBatchWithPayload> batchesWithPayload = new ArrayList<OutgoingBatchWithPayload>(); for (OutgoingBatch batch : extractedBatches) { OutgoingBatchWithPayload batchWithPayload = new OutgoingBatchWithPayload(batch, payloadType); batchWithPayload.setPayload(writer.getPayloadMap().get(batch.getBatchId())); batchWithPayload.setPayloadType(payloadType); batchesWithPayload.add(batchWithPayload); } return batchesWithPayload; } } return Collections.emptyList(); } public List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode, IOutgoingTransport transport) { return extract(extractInfo, targetNode, null, transport); } public List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode, String queue, IOutgoingTransport transport) { /* * make sure that data is routed before extracting if the route job is * not configured to start automatically */ String startRouteJob = parameterService.getString(ParameterConstants.START_ROUTE_JOB_38); boolean startRoutingJob = false; if (StringUtils.isBlank(startRouteJob)) { startRoutingJob = parameterService.is(ParameterConstants.START_ROUTE_JOB); } else { startRoutingJob = parameterService.is(ParameterConstants.START_ROUTE_JOB_38); } if (!startRoutingJob && parameterService.is(ParameterConstants.ROUTE_ON_EXTRACT)) { initialLoadService.queueLoads(true); routerService.routeData(true); } OutgoingBatches batches = loadPendingBatches(extractInfo, targetNode, queue, transport); if (batches != null && batches.containsBatches()) { ChannelMap channelMap = transport.getSuspendIgnoreChannelLists(configurationService, queue, targetNode); List<OutgoingBatch> activeBatches = filterBatchesForExtraction(batches, channelMap); if (activeBatches.size() > 0) { BufferedWriter writer = transport.openWriter(); IDataWriter dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(), writer, targetNode.requires13Compatiblity(), targetNode.allowCaptureTimeInProtocol(), parameterService.is(ParameterConstants.EXTRACT_ROW_CAPTURE_TIME, true)); return extract(extractInfo, targetNode, activeBatches, dataWriter, writer, ExtractMode.FOR_SYM_CLIENT); } } return Collections.emptyList(); } protected OutgoingBatches loadPendingBatches(ProcessInfo extractInfo, Node targetNode, String queue, IOutgoingTransport transport) { BufferedWriter writer = transport.getWriter(); extractInfo.setStatus(ProcessStatus.QUERYING); Callable<OutgoingBatches> getOutgoingBatches = () -> { OutgoingBatches batches = null; if (queue != null) { NodeGroupLink link = configurationService.getNodeGroupLinkFor(nodeService.findIdentity().getNodeGroupId(), targetNode.getNodeGroupId(), false); if (link != null) { NodeGroupLinkAction defaultAction = configurationService.getNodeGroupLinkFor(nodeService.findIdentity().getNodeGroupId(), targetNode.getNodeGroupId(), false).getDataEventAction(); ProcessType processType = extractInfo.getKey().getProcessType(); NodeGroupLinkAction action = null; if (processType.equals(ProcessType.PUSH_JOB_EXTRACT)) { action = NodeGroupLinkAction.P; } else if (processType.equals(ProcessType.PULL_HANDLER_EXTRACT)) { action = NodeGroupLinkAction.W; } batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), queue, action, defaultAction, false); } else { log.error("Group link not found for " + nodeService.findIdentity().getNodeGroupId() + " to " + targetNode.getNodeGroupId() + ". Check that configuration matches on both nodes."); } } else { batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), false); } return batches; }; if (writer != null) { final boolean streamToFileEnabled = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED); long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE); Node sourceNode = nodeService.findIdentity(); FutureTask<OutgoingBatches> getOutgoingBatchesTask = new FutureTask<OutgoingBatches>(getOutgoingBatches); ExecutorService executor = Executors.newFixedThreadPool(1); executor.execute(getOutgoingBatchesTask); try { while (true) { try { return getOutgoingBatchesTask.get(keepAliveMillis, TimeUnit.MILLISECONDS); } catch (TimeoutException ex) { writeKeepAliveAck(writer, sourceNode, streamToFileEnabled); } catch (Exception ex) { throw new SymmetricException("Failed to execute getOutgoingBatchesTask ", ex); } } } finally { executor.shutdown(); } } else { try { return getOutgoingBatches.call(); } catch (Exception ex) { throw new SymmetricException("Failed to execute getOutgoingBatchesTask ", ex); } } } /** * This method will extract an outgoing batch, but will not update the outgoing batch status */ public boolean extractOnlyOutgoingBatch(String nodeId, long batchId, Writer writer) { boolean extracted = false; Node targetNode = null; if (Constants.UNROUTED_NODE_ID.equals(nodeId)) { targetNode = new Node(nodeId, parameterService.getNodeGroupId()); } else { targetNode = nodeService.findNode(nodeId, true); } if (targetNode != null) { OutgoingBatch batch = outgoingBatchService.findOutgoingBatch(batchId, nodeId); if (batch != null) { IDataWriter dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(), writer, targetNode.requires13Compatiblity(), targetNode.allowCaptureTimeInProtocol(), parameterService.is(ParameterConstants.EXTRACT_ROW_CAPTURE_TIME, true)); List<OutgoingBatch> batches = new ArrayList<OutgoingBatch>(1); batches.add(batch); batches = extract(new ProcessInfo(), targetNode, batches, dataWriter, null, ExtractMode.EXTRACT_ONLY); extracted = batches.size() > 0; } } return extracted; } protected List<OutgoingBatch> extract(final ProcessInfo extractInfo, final Node targetNode, final List<OutgoingBatch> activeBatches, final IDataWriter dataWriter, final BufferedWriter writer, final ExtractMode mode) { if (activeBatches.size() > 0) { final List<OutgoingBatch> processedBatches = new ArrayList<OutgoingBatch>(activeBatches.size()); Set<String> channelsProcessed = new HashSet<String>(); long batchesSelectedAtMs = System.currentTimeMillis(); OutgoingBatch currentBatch = null; ExecutorService executor = null; try { final boolean streamToFileEnabled = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED); long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE); Node sourceNode = nodeService.findIdentity(); final FutureExtractStatus status = new FutureExtractStatus(); if (this.threadPoolFactory == null) { this.threadPoolFactory = new CustomizableThreadFactory(String.format("%s-dataextractor", parameterService.getEngineName().toLowerCase())); } executor = streamToFileEnabled ? Executors.newFixedThreadPool(1, this.threadPoolFactory) : null; List<Future<FutureOutgoingBatch>> futures = new ArrayList<Future<FutureOutgoingBatch>>(); extractInfo.setTotalBatchCount(activeBatches.size()); for (int i = 0; i < activeBatches.size(); i++) { currentBatch = activeBatches.get(i); channelsProcessed.add(currentBatch.getChannelId()); final OutgoingBatch extractBatch = currentBatch; Callable<FutureOutgoingBatch> callable = () -> { MDC.put("engineName", engine.getParameterService().getEngineName()); OutgoingBatch refreshedBatch = requeryIfEnoughTimeHasPassed(batchesSelectedAtMs, extractBatch); return extractBatch(refreshedBatch, status, extractInfo, targetNode, dataWriter, mode, activeBatches); }; if (status.shouldExtractSkip) { break; } if (executor != null) { futures.add(executor.submit(callable)); } else { try { FutureOutgoingBatch batch = callable.call(); futures.add(new FutureImpl<>(batch)); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeException(e); } } } if (parameterService.is(ParameterConstants.SYNCHRONIZE_ALL_JOBS) && executor != null) { executor.shutdown(); boolean isProcessed = false; while (!isProcessed) { try { isProcessed = executor.awaitTermination(keepAliveMillis, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } if (!isProcessed) { writeKeepAliveAck(writer, sourceNode, streamToFileEnabled); } } } final long initialLoadMaxBytesToSync = parameterService.getLong(ParameterConstants.INITIAL_LOAD_TRANSPORT_MAX_BYTES_TO_SYNC); long totalBytesSend = 0; boolean logMaxBytesReached = false; Iterator<OutgoingBatch> activeBatchIter = activeBatches.iterator(); for (int i = 0; i < futures.size(); i++) { Future<FutureOutgoingBatch> future = futures.get(i); currentBatch = activeBatchIter.next(); boolean isProcessed = false; ProcessInfo transferInfo = null; while (!isProcessed) { try { FutureOutgoingBatch extractBatch = future.get(keepAliveMillis, TimeUnit.MILLISECONDS); transferInfo = statisticManager.newProcessInfo(new ProcessInfoKey(nodeService.findIdentityNodeId(), extractInfo.getQueue(), targetNode.getNodeId(), extractInfo.getProcessType() == ProcessType.PUSH_JOB_EXTRACT ? ProcessType.PUSH_JOB_TRANSFER : ProcessType.PULL_HANDLER_TRANSFER)); transferInfo.setCurrentBatchId(currentBatch.getBatchId()); transferInfo.incrementBatchCount(); transferInfo.setTotalDataCount(currentBatch.getExtractRowCount()); currentBatch = extractBatch.getOutgoingBatch(); if (i == futures.size() - 1) { extractInfo.setStatus(ProcessStatus.OK); } if (extractBatch.isExtractSkipped) { transferInfo.setStatus(ProcessStatus.OK); break; } if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT || (currentBatch.isExtractJobFlag() && parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB))) { if(totalBytesSend > initialLoadMaxBytesToSync) { if(!logMaxBytesReached) { logMaxBytesReached = true; log.info( "Reached the total byte threshold for initial load after {} of {} batches were sent for node '{}' (sent {} bytes, the max is {}). " + "The remaining batches will be send on a subsequent sync.", new Object[] { i, futures.size(), targetNode.getNodeId(), totalBytesSend, initialLoadMaxBytesToSync }); } transferInfo.setStatus(ProcessStatus.OK); break; } transferInfo.setStatus(ProcessInfo.ProcessStatus.TRANSFERRING); transferInfo.setCurrentLoadId(currentBatch.getLoadId()); boolean isRetry = extractBatch.isRetry() && extractBatch.getOutgoingBatch().getStatus() != OutgoingBatch.Status.IG; currentBatch = sendOutgoingBatch(transferInfo, targetNode, currentBatch, isRetry, dataWriter, writer, mode); totalBytesSend += currentBatch.getByteCount(); } processedBatches.add(currentBatch); if (currentBatch.getStatus() != Status.OK) { currentBatch.setLoadCount(currentBatch.getLoadCount() + 1); changeBatchStatus(Status.LD, currentBatch, mode); } if (currentBatch.getLoadId() > 0) { long transferMillis = transferInfo.getEndTime() == null ? new Date().getTime() - transferInfo.getStartTime().getTime() : transferInfo.getEndTime().getTime() - transferInfo.getStartTime().getTime(); updateExtractRequestTransferred(currentBatch, transferMillis); } transferInfo.setCurrentTableName(currentBatch.getSummary()); transferInfo.setStatus(ProcessStatus.OK); isProcessed = true; } catch (TimeoutException e) { writeKeepAliveAck(writer, sourceNode, streamToFileEnabled); } catch (Exception e) { if (transferInfo != null && transferInfo.getStatus() != ProcessStatus.OK) { transferInfo.setStatus(ProcessStatus.ERROR); } if (e instanceof ExecutionException) { if (isNotBlank(e.getMessage()) && e.getMessage().contains("string truncation")) { throw new RuntimeException( "There is a good chance that the truncation error you are receiving is because contains_big_lobs on the '" + currentBatch.getChannelId() + "' channel needs to be turned on.", e.getCause() != null ? e.getCause() : e); } if (e.getCause() instanceof RuntimeException) { throw (RuntimeException) e.getCause(); } throw new RuntimeException(e.getCause() != null ? e.getCause() : e); } else if (!(e instanceof RuntimeException)) { throw new RuntimeException(e); } else if (e instanceof RuntimeException) { throw (RuntimeException) e; } } } } } catch (RuntimeException e) { if (currentBatch != null) { boolean isNewErrorStaging = false; if (!isStreamClosedByClient(e)) { if (e.getCause() instanceof InterruptedException || e.getCause() instanceof CancellationException) { log.info("Extract of batch {} was interrupted", currentBatch); } else if (e instanceof StagingLowFreeSpace) { log.error("Extract is disabled because disk is almost full: {}", e.getMessage()); } else if (e.getCause() instanceof ZipException || e instanceof ProtocolException || e instanceof IllegalStateException) { if (currentBatch.getSqlCode() != ErrorConstants.STAGE_ERROR_CODE) { isNewErrorStaging = true; } log.warn("The batch {} appears corrupt in staging, so removing it. ({})", currentBatch.getNodeBatchId(), e.getMessage()); IStagedResource resource = getStagedResource(currentBatch); if (resource != null) { resource.delete(); } } else { log.error("Failed to extract batch " + currentBatch, e); } } try { /* Reread batch in case the ignore flag has been set */ currentBatch = outgoingBatchService.findOutgoingBatch(currentBatch.getBatchId(), currentBatch.getNodeId()); SQLException se = ExceptionUtils.unwrapSqlException(e); if (se != null) { currentBatch.setSqlState(se.getSQLState()); currentBatch.setSqlCode(se.getErrorCode()); currentBatch.setSqlMessage(se.getMessage()); } else if (isNewErrorStaging) { currentBatch.setSqlState(ErrorConstants.STAGE_ERROR_STATE); currentBatch.setSqlCode(ErrorConstants.STAGE_ERROR_CODE); currentBatch.setSqlMessage(ExceptionUtils.getRootMessage(e)); } else { currentBatch.setSqlMessage(ExceptionUtils.getRootMessage(e)); } currentBatch.revertStatsOnError(); if (currentBatch.getStatus() != Status.IG && currentBatch.getStatus() != Status.OK) { currentBatch.setStatus(Status.ER); currentBatch.setErrorFlag(isNewErrorStaging ? false : true); statisticManager.incrementDataExtractedErrors(currentBatch.getChannelId(), 1); extractInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); } outgoingBatchService.updateOutgoingBatch(currentBatch); } catch(Exception ex) { log.error("Failed to update the outgoing batch status for failed batch {}", currentBatch, ex); extractInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); } } else { log.error("Could not log the outgoing batch status because the batch was null", e); } } finally { if (executor != null) { executor.shutdown(); } } // Next, we update the node channel controls to the // current timestamp Calendar now = Calendar.getInstance(); for (String channelProcessed : channelsProcessed) { NodeChannel nodeChannel = configurationService.getNodeChannel(channelProcessed, targetNode.getNodeId(), false); if (nodeChannel != null && nodeChannel.getExtractPeriodMillis() > 0) { nodeChannel.setLastExtractTime(now.getTime()); configurationService.updateLastExtractTime(nodeChannel); } } return processedBatches; } else { return Collections.emptyList(); } } protected FutureOutgoingBatch extractBatch(OutgoingBatch extractBatch, FutureExtractStatus status, ProcessInfo extractInfo, Node targetNode, IDataWriter dataWriter, ExtractMode mode, List<OutgoingBatch> activeBatches) throws Exception { extractInfo.setThread(Thread.currentThread()); extractInfo.setCurrentLoadId(extractBatch.getLoadId()); extractInfo.setTotalDataCount(extractBatch.getDataRowCount()); FutureOutgoingBatch outgoingBatch = new FutureOutgoingBatch(extractBatch, false); final long maxBytesToSync = parameterService.getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC); final boolean streamToFileEnabled = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED); if (!status.shouldExtractSkip) { if (extractBatch.isExtractJobFlag() && extractBatch.getStatus() != Status.IG) { if (parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB)) { if (extractBatch.getStatus() != Status.RQ && extractBatch.getStatus() != Status.IG && !isPreviouslyExtracted(extractBatch, false)) { /* * the batch must have been purged. it needs to be * re-extracted */ log.info("Batch {} is marked as ready but it is missing in staging. Rescheduling it for extraction.", extractBatch.getNodeBatchId()); if (mode != ExtractMode.EXTRACT_ONLY) { resetExtractRequest(extractBatch); } status.shouldExtractSkip = outgoingBatch.isExtractSkipped = true; } else if (extractBatch.getStatus() == Status.RQ) { log.info("Batch {} is not ready for delivery. It is currently scheduled for extraction.", extractBatch.getNodeBatchId()); status.shouldExtractSkip = outgoingBatch.isExtractSkipped = true; } } else { extractBatch.setStatus(Status.NE); extractBatch.setExtractJobFlag(false); } } else { try { boolean isRetry = isRetry(extractBatch, targetNode); outgoingBatch = new FutureOutgoingBatch( extractOutgoingBatch(extractInfo, targetNode, dataWriter, extractBatch, streamToFileEnabled, true, mode, null), isRetry); status.batchExtractCount++; status.byteExtractCount += extractBatch.getByteCount(); if (status.byteExtractCount >= maxBytesToSync && status.batchExtractCount < activeBatches.size()) { log.info( "Reached the total byte threshold after {} of {} batches were extracted for node '{}' (extracted {} bytes, the max is {}). " + "The remaining batches will be extracted on a subsequent sync.", new Object[] { status.batchExtractCount, activeBatches.size(), targetNode.getNodeId(), status.byteExtractCount, maxBytesToSync }); status.shouldExtractSkip = true; } } catch (Exception e) { status.shouldExtractSkip = outgoingBatch.isExtractSkipped = true; throw e; } } } else { outgoingBatch.isExtractSkipped = true; } return outgoingBatch; } protected void writeKeepAliveAck(BufferedWriter writer, Node sourceNode, boolean streamToFileEnabled) { try { if (writer != null && streamToFileEnabled) { writer.write(CsvConstants.NODEID + "," + sourceNode.getNodeId()); writer.newLine(); writer.flush(); } } catch (IOException ex) { } } final protected boolean changeBatchStatus(Status status, OutgoingBatch currentBatch, ExtractMode mode) { if (currentBatch.getStatus() != Status.IG) { currentBatch.setStatus(status); } if (mode != ExtractMode.EXTRACT_ONLY) { long batchStatusUpdateMillis = parameterService.getLong(ParameterConstants.OUTGOING_BATCH_UPDATE_STATUS_MILLIS); int batchStatusUpdateDataCount = parameterService.getInt(ParameterConstants.OUTGOING_BATCH_UPDATE_STATUS_DATA_COUNT); Channel channel = configurationService.getChannel(currentBatch.getChannelId()); if (currentBatch.getStatus() == Status.RQ || currentBatch.getStatus() == Status.LD || currentBatch.getLastUpdatedTime() == null || System.currentTimeMillis() - batchStatusUpdateMillis >= currentBatch.getLastUpdatedTime().getTime() || channel.isReloadFlag() || currentBatch.getDataRowCount() > batchStatusUpdateDataCount) { outgoingBatchService.updateOutgoingBatch(currentBatch); return true; } return false; } else { return false; } } /** * If time has passed, then re-query the batch to double check that the * status has not changed */ final protected OutgoingBatch requeryIfEnoughTimeHasPassed(long ts, OutgoingBatch currentBatch) { if (System.currentTimeMillis() - ts > MS_PASSED_BEFORE_BATCH_REQUERIED) { OutgoingBatch batch = outgoingBatchService.findOutgoingBatch(currentBatch.getBatchId(), currentBatch.getNodeId()); if (batch != null && !batch.getStatus().equals(currentBatch.getStatus())) { currentBatch.setStatus(batch.getStatus()); } } return currentBatch; } protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targetNode, IDataWriter dataWriter, OutgoingBatch currentBatch, boolean useStagingDataWriter, boolean updateBatchStatistics, ExtractMode mode, IDataProcessorListener listener) { if (currentBatch.getStatus() != Status.OK || ExtractMode.EXTRACT_ONLY == mode || ExtractMode.FOR_SYM_CLIENT == mode) { Node sourceNode = nodeService.findIdentity(); IDataWriter writer = wrapWithTransformWriter(sourceNode, targetNode, extractInfo, dataWriter, useStagingDataWriter); long ts = System.currentTimeMillis(); long extractTimeInMs = 0l; long byteCount = 0l; long transformTimeInMs = 0l; if (currentBatch.getStatus() == Status.IG) { cleanupIgnoredBatch(sourceNode, targetNode, currentBatch, writer); } else if (currentBatch.getStatus() == Status.RQ || !isPreviouslyExtracted(currentBatch, false)) { BatchLock lock = null; try { log.debug("{} attempting to acquire lock for batch {}", targetNode.getNodeId(), currentBatch.getBatchId()); lock = acquireLock(currentBatch, useStagingDataWriter); log.debug("{} acquired lock for batch {}", targetNode.getNodeId(), currentBatch.getBatchId()); if (currentBatch.getStatus() == Status.RQ || !isPreviouslyExtracted(currentBatch, true)) { log.debug("{} extracting batch {}", targetNode.getNodeId(), currentBatch.getBatchId()); currentBatch.setExtractCount(currentBatch.getExtractCount() + 1); if (currentBatch.getExtractStartTime() == null) { currentBatch.setExtractStartTime(new Date()); } if (updateBatchStatistics) { changeBatchStatus(Status.QY, currentBatch, mode); } DataContext ctx = new DataContext(); ctx.put(Constants.DATA_CONTEXT_TARGET_NODE_ID, targetNode.getNodeId()); ctx.put(Constants.DATA_CONTEXT_TARGET_NODE_EXTERNAL_ID, targetNode.getExternalId()); ctx.put(Constants.DATA_CONTEXT_TARGET_NODE_GROUP_ID, targetNode.getNodeGroupId()); ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode); ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, sourceNode); ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE_ID, sourceNode.getNodeId()); ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE_EXTERNAL_ID, sourceNode.getExternalId()); ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE_GROUP_ID, sourceNode.getNodeGroupId()); ctx.put(Constants.DATA_CONTEXT_ENGINE, engine); extractInfo.setTotalDataCount(currentBatch.getDataRowCount()); currentBatch.resetStats(); IDataReader dataReader = buildExtractDataReader(sourceNode, targetNode, currentBatch, extractInfo); try { new DataProcessor(dataReader, writer, listener, "extract").process(ctx); } catch (Exception e) { if ((e instanceof ProtocolException || (e.getCause() != null && e.getCause() instanceof SQLException && ((SQLException) e.getCause()).getErrorCode() == 6502)) && !configurationService.getNodeChannel(currentBatch.getChannelId(), false).getChannel().isContainsBigLob()) { log.warn(e.getMessage()); log.info("Re-attempting extraction for batch {} with contains_big_lobs temporarily enabled for channel {}", currentBatch.getBatchId(), currentBatch.getChannelId()); extractInfo.setTotalDataCount(currentBatch.getDataRowCount()); currentBatch.resetStats(); IStagedResource resource = getStagedResource(currentBatch); if (resource != null) { resource.delete(); } dataReader = buildExtractDataReader(sourceNode, targetNode, currentBatch, extractInfo, true); writer = wrapWithTransformWriter(sourceNode, targetNode, extractInfo, dataWriter, useStagingDataWriter); new DataProcessor(dataReader, writer, listener, "extract").process(ctx); } else { throw e; } } extractTimeInMs = System.currentTimeMillis() - ts; Statistics stats = getExtractStats(writer, currentBatch); if (stats != null) { transformTimeInMs = stats.get(DataWriterStatisticConstants.TRANSFORMMILLIS); currentBatch.setDataRowCount(stats.get(DataWriterStatisticConstants.ROWCOUNT)); currentBatch.setDataInsertRowCount(stats.get(DataWriterStatisticConstants.INSERTCOUNT)); currentBatch.setDataUpdateRowCount(stats.get(DataWriterStatisticConstants.UPDATECOUNT)); currentBatch.setDataDeleteRowCount(stats.get(DataWriterStatisticConstants.DELETECOUNT)); currentBatch.setTableExtractedCount(stats.getTableStats()); currentBatch.setTransformExtractMillis(transformTimeInMs); extractTimeInMs = extractTimeInMs - transformTimeInMs; byteCount = stats.get(DataWriterStatisticConstants.BYTECOUNT); statisticManager.incrementDataBytesExtracted(currentBatch.getChannelId(), byteCount); statisticManager.incrementDataExtracted(currentBatch.getChannelId(), stats.get(DataWriterStatisticConstants.ROWCOUNT)); statisticManager.incrementTableRows(currentBatch.getTableExtractedCount(), false); currentBatch.setByteCount(byteCount); if (!useStagingDataWriter) { statisticManager.incrementDataBytesSent(currentBatch.getChannelId(), byteCount); statisticManager.incrementDataSent(currentBatch.getChannelId(), stats.get(DataWriterStatisticConstants.ROWCOUNT)); } if (currentBatch.isCommonFlag()) { outgoingBatchService.updateCommonBatchExtractStatistics(currentBatch); } } } } catch (RuntimeException ex) { IStagedResource resource = getStagedResource(currentBatch); if (resource != null) { resource.close(); resource.delete(); } throw ex; } finally { try { IStagedResource resource = getStagedResource(currentBatch); if (resource != null) { resource.setState(State.DONE); } } finally { releaseLock(lock, currentBatch, useStagingDataWriter); log.debug("{} released lock for batch {}", targetNode.getNodeId(), currentBatch.getBatchId()); } } } if (updateBatchStatistics) { currentBatch = requeryIfEnoughTimeHasPassed(ts, currentBatch); if (extractTimeInMs > 0) { currentBatch.setExtractMillis(extractTimeInMs); } if (byteCount > 0) { currentBatch.setByteCount(byteCount); } if (transformTimeInMs > 0) { currentBatch.setTransformExtractMillis(transformTimeInMs); } if (currentBatch.getLoadId() > 0 && (currentBatch.getSummary() == null || !currentBatch.getSummary().startsWith(symmetricDialect.getTablePrefix()))) { if (currentBatch.getExtractRowCount() != currentBatch.getDataRowCount()) { currentBatch.setDataRowCount(currentBatch.getExtractRowCount()); currentBatch.setDataInsertRowCount(currentBatch.getExtractInsertRowCount()); } ExtractRequest extractRequest = getExtractRequestForBatch(currentBatch); if (extractRequest != null) { sqlTemplate.update(getSql("updateExtractRequestStatus"), ExtractStatus.OK.name(), currentBatch.getExtractRowCount(), currentBatch.getExtractMillis(), extractRequest.getRequestId()); checkSendDeferredConstraints(extractRequest, null, targetNode); } } } } return currentBatch; } protected String getSemaphoreKey(OutgoingBatch batch, boolean useStagingDataWriter) { return useStagingDataWriter ? Long.toString(batch.getBatchId()) : batch.getNodeBatchId(); } private BatchLock acquireLock(OutgoingBatch batch, boolean useStagingDataWriter) { String semaphoreKey = getSemaphoreKey(batch, useStagingDataWriter); BatchLock lock = null; synchronized (DataExtractorService.this) { lock = locks.get(semaphoreKey); if (lock == null) { lock = new BatchLock(semaphoreKey); locks.put(semaphoreKey, lock); } lock.referenceCount++; } try { lock.acquire(); // In-memory, intra-process lock. if (isStagingFileLockRequired(batch)) { // File-system, inter-process lock for clustering. StagingFileLock fileLock = acquireStagingFileLock(batch); if (fileLock.isAcquired()) { lock.fileLock = fileLock; } else { // Didn't get the fileLock, ditch the in-memory lock as well. releaseLock(lock, batch, useStagingDataWriter); // So the next releaseLock() does not do anything with the lock lock = null; throw new SymmetricException("Failed to get extract lock on batch " + batch.getNodeBatchId()); } } } catch (InterruptedException e) { releaseLock(lock, batch, useStagingDataWriter); throw new org.jumpmind.exception.InterruptedException(e); } catch (Throwable e) { releaseLock(lock, batch, useStagingDataWriter); if (e instanceof SymmetricException) { throw (SymmetricException) e; } else { throw new SymmetricException(e); } } return lock; } protected StagingFileLock acquireStagingFileLock(OutgoingBatch batch) { boolean stagingFileAcquired = false; StagingFileLock fileLock = null; int iterations = 0; while (!stagingFileAcquired) { fileLock = stagingManager.acquireFileLock(getLockingServerInfo(), Constants.STAGING_CATEGORY_OUTGOING, batch.getStagedLocation(), batch.getBatchId()); stagingFileAcquired = fileLock.isAcquired(); if (!stagingFileAcquired) { if (fileLock.getLockFile() == null) { log.warn("Staging lock file not acquired " + fileLock.getLockFailureMessage()); return fileLock; } long lockAge = fileLock.getLockAge(); if (lockAge >= parameterService.getLong(ParameterConstants.LOCK_TIMEOUT_MS)) { log.warn("Lock {} in place for {} > about to BREAK the lock.", fileLock.getLockFile(), DurationFormatUtils.formatDurationWords(lockAge, true, true)); fileLock.breakLock(); } else { if ((iterations % 10) == 0) { log.info("Lock {} in place for {}, waiting...", fileLock.getLockFile(), DurationFormatUtils.formatDurationWords(lockAge, true, true)); } else { log.debug("Lock {} in place for {}, waiting...", fileLock.getLockFile(), DurationFormatUtils.formatDurationWords(lockAge, true, true)); } try { Thread.sleep(parameterService.getLong(ParameterConstants.LOCK_WAIT_RETRY_MILLIS)); } catch (InterruptedException ex) { log.debug("Interrupted.", ex); } } } iterations++; } return fileLock; } private String getLockingServerInfo() { return String.format("Server: '%s' Host: '%s' IP: '%s'", clusterService.getServerId(), AppUtils.getHostName(), AppUtils.getIpAddress()); } protected void releaseLock(BatchLock lock, OutgoingBatch batch, boolean useStagingDataWriter) { if (lock != null) { synchronized (DataExtractorService.this) { lock.referenceCount--; if (lock.referenceCount == 0) { locks.remove(lock.semaphoreKey); } lock.release(); } if (lock.fileLock != null) { lock.fileLock.releaseLock(); } } } protected boolean isStagingFileLockRequired(OutgoingBatch batch) { return batch.isCommonFlag() && parameterService.is(ParameterConstants.CLUSTER_STAGING_ENABLED); } protected void triggerReExtraction(OutgoingBatch currentBatch) { // Allow user to reset batch status to NE in the DB to trigger a batch re-extract IStagedResource resource = getStagedResource(currentBatch); if (resource != null) { resource.delete(); } } protected ExtractDataReader buildExtractDataReader(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, ProcessInfo processInfo) { return new ExtractDataReader(symmetricDialect.getPlatform(), new SelectFromSymDataSource(currentBatch, sourceNode, targetNode, processInfo)); } protected ExtractDataReader buildExtractDataReader(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, ProcessInfo processInfo, boolean containsBigLob) { return new ExtractDataReader(symmetricDialect.getPlatform(), new SelectFromSymDataSource(currentBatch, sourceNode, targetNode, processInfo, containsBigLob)); } protected Statistics getExtractStats(IDataWriter writer, OutgoingBatch currentBatch) { Map<Batch, Statistics> statisticsMap = null; if (writer instanceof TransformWriter) { statisticsMap = ((TransformWriter) writer).getNestedWriter().getStatistics(); } else { statisticsMap = writer.getStatistics(); } if (statisticsMap.size() > 0) { for (Entry<Batch, Statistics> entry : statisticsMap.entrySet()) { if (entry.getKey().getBatchId() == currentBatch.getBatchId()) { return entry.getValue(); } } } return null; } protected IDataWriter wrapWithTransformWriter(Node sourceNode, Node targetNode, ProcessInfo processInfo, IDataWriter dataWriter, boolean useStagingDataWriter) { TransformWriter transformExtractWriter = null; if (useStagingDataWriter) { long memoryThresholdInBytes = parameterService .getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD); transformExtractWriter = createTransformDataWriter( sourceNode, targetNode, new ProcessInfoDataWriter(new StagingDataWriter(memoryThresholdInBytes, true, nodeService .findIdentityNodeId(), Constants.STAGING_CATEGORY_OUTGOING, stagingManager, targetNode.allowCaptureTimeInProtocol(), parameterService.is(ParameterConstants.EXTRACT_ROW_CAPTURE_TIME, true)), processInfo)); } else { transformExtractWriter = createTransformDataWriter(sourceNode, targetNode, new ProcessInfoDataWriter(dataWriter, processInfo)); } return transformExtractWriter; } protected void cleanupIgnoredBatch(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, IDataWriter writer) { Batch batch = new Batch(BatchType.EXTRACT, currentBatch.getBatchId(), currentBatch.getChannelId(), symmetricDialect.getBinaryEncoding(), sourceNode.getNodeId(), currentBatch.getNodeId(), currentBatch.isCommonFlag()); batch.setIgnored(true); try { IStagedResource resource = getStagedResource(currentBatch); if (resource != null) { resource.delete(); } DataContext ctx = new DataContext(batch); ctx.put("targetNode", targetNode); ctx.put("sourceNode", sourceNode); writer.open(ctx); writer.start(batch); writer.end(batch, false); } finally { writer.close(); } } protected IStagedResource getStagedResource(OutgoingBatch currentBatch) { return stagingManager.find(Constants.STAGING_CATEGORY_OUTGOING, currentBatch.getStagedLocation(), currentBatch.getBatchId()); } protected boolean isPreviouslyExtracted(OutgoingBatch currentBatch, boolean acquireReference) { IStagedResource previouslyExtracted = getStagedResource(currentBatch); if (previouslyExtracted != null && previouslyExtracted.getState() == State.DONE) { if (log.isDebugEnabled()) { log.debug("We have already extracted batch {}. Using the existing extraction: {}", currentBatch.getBatchId(), previouslyExtracted); } if (acquireReference) { previouslyExtracted.reference(); } return true; } return false; } protected boolean isRetry(OutgoingBatch currentBatch, Node remoteNode) { if (currentBatch.getSentCount() > 0 && currentBatch.getStatus() != OutgoingBatch.Status.RS) { boolean offline = parameterService.is(ParameterConstants.NODE_OFFLINE, false); boolean cclient = StringUtils.equals(remoteNode.getDeploymentType(), Constants.DEPLOYMENT_TYPE_CCLIENT); if (remoteNode.isVersionGreaterThanOrEqualTo(3, 8, 0) && !offline && !cclient) { IStagedResource previouslyExtracted = getStagedResource(currentBatch); return previouslyExtracted != null && previouslyExtracted.getState() == State.DONE; } } return false; } protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNode, OutgoingBatch currentBatch, boolean isRetry, IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) { if (currentBatch.getStatus() != Status.OK || ExtractMode.EXTRACT_ONLY == mode) { currentBatch.setSentCount(currentBatch.getSentCount() + 1); if (currentBatch.getStatus() != Status.RS) { currentBatch.setTransferStartTime(new Date()); } long ts = System.currentTimeMillis(); IStagedResource extractedBatch = getStagedResource(currentBatch); if (extractedBatch != null) { processInfo.setTotalDataCount(currentBatch.getDataRowCount()); if (currentBatch.getLoadId() > 0) { processInfo.setCurrentTableName(currentBatch.getSummary()); } if (mode == ExtractMode.FOR_SYM_CLIENT && writer != null) { if (!isRetry && parameterService.is(ParameterConstants.OUTGOING_BATCH_COPY_TO_INCOMING_STAGING) && !parameterService.is(ParameterConstants.NODE_OFFLINE, false)) { ISymmetricEngine targetEngine = AbstractSymmetricEngine.findEngineByUrl(targetNode.getSyncUrl()); if (targetEngine != null && extractedBatch.isFileResource() && targetEngine.getParameterService().is(ParameterConstants.STREAM_TO_FILE_ENABLED)) { Node sourceNode = nodeService.findIdentity(); Node targetNodeByEngine = targetEngine.getNodeService().findIdentity(); if ((sourceNode != null && sourceNode.equals(targetNodeByEngine)) || (targetNodeByEngine != null && !targetNodeByEngine.equals(targetNode))) { log.warn("Target engine (NodeId {}) is the same engine as the current one and differs from the correct target (NodeId {}). This looks like a mis-configuration of the sync urls '{}'", targetNodeByEngine.getNodeId(), targetNode.getNodeId(), targetNode.getSyncUrl()); } else { IStagedResource targetResource = targetEngine.getStagingManager().create( Constants.STAGING_CATEGORY_INCOMING, Batch.getStagedLocation(false, sourceNode.getNodeId(), currentBatch.getBatchId()), currentBatch.getBatchId()); try { SymmetricUtils.copyFile(extractedBatch.getFile(), targetResource.getFile()); processInfo.setCurrentDataCount(currentBatch.getDataRowCount()); if(log.isDebugEnabled()) { log.debug("Copied file to incoming staging of remote engine {}", targetResource.getFile().getAbsolutePath()); } targetResource.setState(State.DONE); isRetry = true; if (currentBatch.getSentCount() == 1) { statisticManager.incrementDataSent(currentBatch.getChannelId(), currentBatch.getDataRowCount()); statisticManager.incrementDataBytesSent(currentBatch.getChannelId(), extractedBatch.getFile().length()); } } catch (Exception e) { FileUtils.deleteQuietly(targetResource.getFile()); throw new RuntimeException(e); } } } } Channel channel = configurationService.getChannel(currentBatch.getChannelId()); DataContext ctx = new DataContext(); transferFromStaging(mode, BatchType.EXTRACT, currentBatch, isRetry, extractedBatch, writer, ctx, channel.getMaxKBytesPerSecond(), processInfo); } else { IDataReader dataReader = new ProtocolDataReader(BatchType.EXTRACT, currentBatch.getNodeId(), extractedBatch); DataContext ctx = new DataContext(); ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode); ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, nodeService.findIdentity()); new DataProcessor(dataReader, new ProcessInfoDataWriter(dataWriter, processInfo), "send from stage") .process(ctx); if (dataReader.getStatistics().size() > 0) { if (!isRetry) { Statistics stats = dataReader.getStatistics().values().iterator().next(); statisticManager.incrementDataSent(currentBatch.getChannelId(), stats.get(DataReaderStatistics.READ_RECORD_COUNT)); long byteCount = stats.get(DataReaderStatistics.READ_BYTE_COUNT); statisticManager.incrementDataBytesSent(currentBatch.getChannelId(), byteCount); } } else { log.warn("Could not find recorded statistics for batch {}", currentBatch.getNodeBatchId()); } } } else { throw new IllegalStateException(String.format( "Could not find the staged resource for batch %s", currentBatch.getNodeBatchId())); } currentBatch = requeryIfEnoughTimeHasPassed(ts, currentBatch); } return currentBatch; } protected void transferFromStaging(ExtractMode mode, BatchType batchType, OutgoingBatch batch, boolean isRetry, IStagedResource stagedResource, BufferedWriter writer, DataContext context, BigDecimal maxKBytesPerSec, ProcessInfo processInfo) { final int MAX_WRITE_LENGTH = 32768; BufferedReader reader = stagedResource.getReader(); try { // Retry means we've sent this batch before, so let's ask to // retry the batch from the target's staging if (isRetry) { String line = null; while ((line = reader.readLine()) != null) { if (line.startsWith(CsvConstants.BATCH)) { if (nodeService.findNode(batch.getNodeId(), true).isVersionGreaterThanOrEqualTo(3, 9, 0)) { writer.write(getBatchStatsColumns()); writer.newLine(); writer.write(getBatchStats(batch)); writer.newLine(); } writer.write(CsvConstants.RETRY + "," + batch.getBatchId()); writer.newLine(); writer.write(CsvConstants.COMMIT + "," + batch.getBatchId()); writer.newLine(); break; } else { writer.write(line); writer.newLine(); } } writer.flush(); processInfo.setCurrentDataCount(batch.getDataRowCount()); } else { long totalBytes = stagedResource.getSize(); long totalCharsRead = 0, totalBytesRead = 0; int numCharsRead = 0, numBytesRead = 0; long startTime = System.currentTimeMillis(), ts = startTime, bts = startTime; boolean isThrottled = maxKBytesPerSec != null && maxKBytesPerSec.compareTo(BigDecimal.ZERO) > 0; long totalThrottleTime = 0; int bufferSize = MAX_WRITE_LENGTH; if (isThrottled) { bufferSize = maxKBytesPerSec.multiply(new BigDecimal(1024)).intValue(); } char[] buffer = new char[bufferSize]; boolean batchStatsWritten = false; String prevBuffer = ""; while ((numCharsRead = reader.read(buffer)) != -1) { if (!batchStatsWritten && nodeService.findNode(batch.getNodeId(), true).isVersionGreaterThanOrEqualTo(3, 9, 0)) { batchStatsWritten = writeBatchStats(writer, buffer, numCharsRead, prevBuffer, batch); prevBuffer = new String(buffer); } else { writer.write(buffer, 0, numCharsRead); } totalCharsRead += numCharsRead; if (Thread.currentThread().isInterrupted()) { throw new IoException("This thread was interrupted"); } long batchStatusUpdateMillis = parameterService.getLong(ParameterConstants.OUTGOING_BATCH_UPDATE_STATUS_MILLIS); if (System.currentTimeMillis() - ts > batchStatusUpdateMillis && batch.getStatus() != Status.SE && batch.getStatus() != Status.RS) { changeBatchStatus(Status.SE, batch, mode); } if (System.currentTimeMillis() - ts > LOG_PROCESS_SUMMARY_THRESHOLD) { log.info( "Batch '{}', for node '{}', for process 'send from stage' has been processing for {} seconds. " + "The following stats have been gathered: {}", new Object[] { batch.getBatchId(), batch.getNodeId(), (System.currentTimeMillis() - startTime) / 1000, "CHARS=" + totalCharsRead }); ts = System.currentTimeMillis(); } if (isThrottled) { numBytesRead += new String(buffer, 0, numCharsRead).getBytes().length; totalBytesRead += numBytesRead; if (numBytesRead >= bufferSize) { long expectedMillis = (long) (((numBytesRead / 1024f) / maxKBytesPerSec.floatValue()) * 1000); long actualMillis = System.currentTimeMillis() - bts; if (actualMillis < expectedMillis) { totalThrottleTime += expectedMillis - actualMillis; Thread.sleep(expectedMillis - actualMillis); } numBytesRead = 0; bts = System.currentTimeMillis(); } } else { totalBytesRead += new String(buffer, 0, numCharsRead).getBytes().length; } processInfo.setCurrentDataCount((long) ((totalBytesRead / (double) totalBytes) * batch.getDataRowCount())); } if (batch.getSentCount() == 1) { statisticManager.incrementDataSent(batch.getChannelId(), batch.getDataRowCount()); statisticManager.incrementDataBytesSent(batch.getChannelId(), totalBytesRead); } if (log.isDebugEnabled() && totalThrottleTime > 0) { log.debug("Batch '{}' for node '{}' took {}ms for {} bytes and was throttled for {}ms because limit is set to {} KB/s", batch.getBatchId(), batch.getNodeId(), (System.currentTimeMillis() - startTime), totalBytesRead, totalThrottleTime, maxKBytesPerSec); } } if (writer instanceof BatchBufferedWriter) { ((BatchBufferedWriter)writer).getBatchIds().add(batch.getBatchId()); } } catch (Throwable t) { throw new RuntimeException(t); } finally { stagedResource.close(); stagedResource.dereference(); } } protected int findStatsIndex(String bufferString, String prevBuffer) { int index = -1; String fullBuffer = prevBuffer + bufferString; String pattern = "\n" + CsvConstants.BATCH + "\\s*,\\s*\\d*\r*\n"; Pattern r = Pattern.compile(pattern); Matcher m = r.matcher(fullBuffer); if (m.find()) { String group = m.group(0); int start = fullBuffer.indexOf(group); if (start + group.length() > prevBuffer.length()) { index = start + group.length() - prevBuffer.length(); } else { index = start + group.length(); } } return index; } @Override public List<ExtractRequest> getPendingTablesForExtractByLoadId(long loadId) { return sqlTemplate.query(getSql("selectIncompleteTablesForExtractByLoadId"), new ExtractRequestMapper(), loadId); } @Override public List<ExtractRequest> getCompletedTablesForExtractByLoadId(long loadId) { return sqlTemplate.query(getSql("selectCompletedTablesForExtractByLoadId"), new ExtractRequestMapper(), loadId); } @Override public void updateExtractRequestLoadTime(ISqlTransaction transaction, Date loadTime, OutgoingBatch outgoingBatch) { transaction.prepareAndExecute(getSql("updateExtractRequestLoadTime"), outgoingBatch.getBatchId(), outgoingBatch.getReloadRowCount() > 0 ? outgoingBatch.getDataRowCount() : 0, outgoingBatch.getLoadMillis(), outgoingBatch.getBatchId(), outgoingBatch.getBatchId(), outgoingBatch.getBatchId(), outgoingBatch.getNodeId(), outgoingBatch.getLoadId()); TableReloadStatus status = dataService.updateTableReloadStatusDataLoaded(transaction, outgoingBatch.getLoadId(), outgoingBatch.getBatchId(), 1); if (status != null && status.isFullLoad() && (status.isCancelled() || status.isCompleted())) { log.info("Initial load ended for node {}", outgoingBatch.getNodeId()); nodeService.setInitialLoadEnded(transaction, outgoingBatch.getNodeId()); } } @Override public void updateExtractRequestTransferred(OutgoingBatch batch, long transferMillis) { ISqlTransaction transaction = null; try { transaction = sqlTemplate.startSqlTransaction(); transaction.prepareAndExecute(getSql("updateExtractRequestTransferred"), batch.getBatchId(), batch.getDataRowCount(), transferMillis, batch.getBatchId(), batch.getBatchId(), batch.getNodeId(), batch.getLoadId(), batch.getBatchId()); transaction.commit(); } catch (Error ex) { if (transaction != null) { transaction.rollback(); } throw ex; } catch (RuntimeException ex) { if (transaction != null) { transaction.rollback(); } throw ex; } finally { close(transaction); } } @Override public int cancelExtractRequests(long loadId) { return sqlTemplate.update(getSql("cancelExtractRequests"), ExtractStatus.OK.name(), loadId); } protected boolean writeBatchStats(BufferedWriter writer, char[] buffer, int bufferSize, String prevBuffer, OutgoingBatch batch) throws IOException { String bufferString = new String(buffer); int index = findStatsIndex(bufferString, prevBuffer); if (index > 0) { char prefix[] = Arrays.copyOf(buffer, index); writer.write(prefix, 0, index); } if (index > -1) { String stats = getBatchStatsColumns() + System.lineSeparator() + getBatchStats(batch) + System.lineSeparator(); char statsBuffer[] = stats.toCharArray(); writer.write(statsBuffer, 0, statsBuffer.length); char suffix[] = Arrays.copyOfRange(buffer, index, buffer.length); writer.write(suffix, 0, bufferSize - index); } else { writer.write(buffer, 0, bufferSize); } return index > -1; } protected String getBatchStatsColumns() { return StringUtils.join(new String[] { CsvConstants.STATS_COLUMNS, DataReaderStatistics.LOAD_FLAG, DataReaderStatistics.EXTRACT_COUNT, DataReaderStatistics.SENT_COUNT, DataReaderStatistics.LOAD_COUNT, DataReaderStatistics.LOAD_ID, DataReaderStatistics.COMMON_FLAG, DataReaderStatistics.ROUTER_MILLIS, DataReaderStatistics.EXTRACT_MILLIS, DataReaderStatistics.TRANSFORM_EXTRACT_MILLIS, DataReaderStatistics.TRANSFORM_LOAD_MILLIS, DataReaderStatistics.RELOAD_ROW_COUNT, DataReaderStatistics.OTHER_ROW_COUNT, DataReaderStatistics.DATA_ROW_COUNT, DataReaderStatistics.DATA_INSERT_ROW_COUNT, DataReaderStatistics.DATA_UPDATE_ROW_COUNT, DataReaderStatistics.DATA_DELETE_ROW_COUNT, DataReaderStatistics.EXTRACT_ROW_COUNT, DataReaderStatistics.EXTRACT_INSERT_ROW_COUNT, DataReaderStatistics.EXTRACT_UPDATE_ROW_COUNT, DataReaderStatistics.EXTRACT_DELETE_ROW_COUNT, DataReaderStatistics.FAILED_DATA_ID }, ','); } protected String getBatchStats(OutgoingBatch batch) { return StringUtils.join(new String[] { CsvConstants.STATS, String.valueOf(batch.isLoadFlag() ? 1 : 0), String.valueOf(batch.getExtractCount()), String.valueOf(batch.getSentCount()), String.valueOf(batch.getLoadCount()), String.valueOf(batch.getLoadId()), String.valueOf(batch.isCommonFlag() ? 1 : 0), String.valueOf(batch.getRouterMillis()), String.valueOf(batch.getExtractMillis()), String.valueOf(batch.getTransformExtractMillis()), String.valueOf(batch.getTransformLoadMillis()), String.valueOf(batch.getReloadRowCount()), String.valueOf(batch.getOtherRowCount()), String.valueOf(batch.getDataRowCount()), String.valueOf(batch.getDataInsertRowCount()), String.valueOf(batch.getDataUpdateRowCount()), String.valueOf(batch.getDataDeleteRowCount()), String.valueOf(batch.getExtractRowCount()), String.valueOf(batch.getExtractInsertRowCount()), String.valueOf(batch.getExtractUpdateRowCount()), String.valueOf(batch.getExtractDeleteRowCount()), String.valueOf(batch.getFailedDataId()) }, ','); } public boolean extractBatchRange(Writer writer, String nodeId, long startBatchId, long endBatchId) { boolean foundBatch = false; Node sourceNode = nodeService.findIdentity(); for (long batchId = startBatchId; batchId <= endBatchId; batchId++) { OutgoingBatch batch = outgoingBatchService.findOutgoingBatch(batchId, nodeId); if (batch != null) { Node targetNode = nodeService.findNode(nodeId, true); if (targetNode == null && Constants.UNROUTED_NODE_ID.equals(nodeId)) { targetNode = new Node(); targetNode.setNodeId("-1"); } if (targetNode != null) { IDataReader dataReader = new ExtractDataReader(symmetricDialect.getPlatform(), new SelectFromSymDataSource(batch, sourceNode, targetNode, new ProcessInfo())); DataContext ctx = new DataContext(); ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode); ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, nodeService.findIdentity()); new DataProcessor(dataReader, createTransformDataWriter( nodeService.findIdentity(), targetNode, new ProtocolDataWriter(nodeService.findIdentityNodeId(), writer, targetNode.requires13Compatiblity(), false, false)), "extract range").process(ctx); foundBatch = true; } } } return foundBatch; } public boolean extractBatchRange(Writer writer, String nodeId, Date startBatchTime, Date endBatchTime, String... channelIds) { boolean foundBatch = false; Node sourceNode = nodeService.findIdentity(); OutgoingBatches batches = outgoingBatchService.getOutgoingBatchRange(nodeId, startBatchTime, endBatchTime, channelIds); List<OutgoingBatch> list = batches.getBatches(); for (OutgoingBatch outgoingBatch : list) { Node targetNode = nodeService.findNode(nodeId, true); if (targetNode == null && Constants.UNROUTED_NODE_ID.equals(nodeId)) { targetNode = new Node(); targetNode.setNodeId("-1"); } if (targetNode != null) { IDataReader dataReader = new ExtractDataReader(symmetricDialect.getPlatform(), new SelectFromSymDataSource(outgoingBatch, sourceNode, targetNode, new ProcessInfo())); DataContext ctx = new DataContext(); ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode); ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, nodeService.findIdentity()); new DataProcessor(dataReader, createTransformDataWriter(nodeService.findIdentity(), targetNode, new ProtocolDataWriter(nodeService.findIdentityNodeId(), writer, targetNode.requires13Compatiblity(), false, false)), "extract range").process(ctx); foundBatch = true; } } return foundBatch; } protected TransformWriter createTransformDataWriter(Node identity, Node targetNode, IDataWriter extractWriter) { List<TransformTableNodeGroupLink> transformsList = null; if (targetNode != null) { transformsList = transformService.findTransformsFor( new NodeGroupLink(identity.getNodeGroupId(), targetNode.getNodeGroupId()), TransformPoint.EXTRACT); } TransformTable[] transforms = transformsList != null ? transformsList .toArray(new TransformTable[transformsList.size()]) : null; TransformWriter transformExtractWriter = new TransformWriter(symmetricDialect.getTargetPlatform(), TransformPoint.EXTRACT, extractWriter, transformService.getColumnTransforms(), transforms); return transformExtractWriter; } protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId, TriggerHistory triggerHistory, Node sourceNode, Node targetNode, boolean setTargetTableName, boolean useDatabaseDefinition, boolean addMissingColumns) { String catalogName = triggerHistory.getSourceCatalogName(); String schemaName = triggerHistory.getSourceSchemaName(); String tableName = triggerHistory.getSourceTableName(); Table table = null; if (useDatabaseDefinition) { table = getTargetPlatform(tableName).getTableFromCache(catalogName, schemaName, tableName, false); if (table != null && table.getColumnCount() < triggerHistory.getParsedColumnNames().length) { /* * If the column count is less than what trigger history reports, then * chances are the table cache is out of date. */ table = getTargetPlatform(tableName).getTableFromCache(catalogName, schemaName, tableName, true); } if (table != null) { table = table.copyAndFilterColumns(triggerHistory.getParsedColumnNames(), triggerHistory.getParsedPkColumnNames(), true, addMissingColumns); } else { throw new SymmetricException("Could not find the following table. It might have been dropped: %s", Table.getFullyQualifiedTableName(catalogName, schemaName, tableName)); } } else { table = new Table(tableName); table.addColumns(triggerHistory.getParsedColumnNames()); table.setPrimaryKeys(triggerHistory.getParsedPkColumnNames()); } Router router = triggerRouterService.getRouterById(routerId, false); if (router != null && setTargetTableName) { if (router.isUseSourceCatalogSchema()) { table.setCatalog(catalogName); table.setSchema(schemaName); } else { table.setCatalog(null); table.setSchema(null); } if (StringUtils.equals(Constants.NONE_TOKEN, router.getTargetCatalogName())) { table.setCatalog(null); } else if (StringUtils.isNotBlank(router.getTargetCatalogName())) { table.setCatalog(replaceVariables(sourceNode, targetNode, router.getTargetCatalogName())); } if (StringUtils.equals(Constants.NONE_TOKEN, router.getTargetSchemaName())) { table.setSchema(null); } else if (StringUtils.isNotBlank(router.getTargetSchemaName())) { table.setSchema(replaceVariables(sourceNode, targetNode, router.getTargetSchemaName())); } if (StringUtils.isNotBlank(router.getTargetTableName())) { table.setName(router.getTargetTableName()); } } return table; } protected String replaceVariables(Node sourceNode, Node targetNode, String str) { str = FormatUtils.replace("sourceNodeId", sourceNode.getNodeId(), str); str = FormatUtils.replace("sourceExternalId", sourceNode.getExternalId(), str); str = FormatUtils.replace("sourceNodeGroupId", sourceNode.getNodeGroupId(), str); str = FormatUtils.replace("targetNodeId", targetNode.getNodeGroupId(), str); str = FormatUtils.replace("targetExternalId", targetNode.getExternalId(), str); str = FormatUtils.replace("targetNodeGroupId", targetNode.getNodeGroupId(), str); return str; } public RemoteNodeStatuses queueWork(boolean force) { final RemoteNodeStatuses statuses = new RemoteNodeStatuses(configurationService.getChannels(false)); Node identity = nodeService.findIdentity(); if (identity != null) { if (force || clusterService.lock(ClusterConstants.INITIAL_LOAD_EXTRACT)) { try { List<NodeQueuePair> nodes = getExtractRequestNodes(); for (NodeQueuePair pair : nodes) { clusterService.refreshLock(ClusterConstants.INITIAL_LOAD_EXTRACT); queue(pair.getNodeId(), pair.getQueue(), statuses); } } finally { if (!force) { clusterService.unlock(ClusterConstants.INITIAL_LOAD_EXTRACT); } } } } else { log.debug("Not running initial load extract service because this node does not have an identity"); } return statuses; } protected void queue(String nodeId, String queue, RemoteNodeStatuses statuses) { final NodeCommunication.CommunicationType TYPE = NodeCommunication.CommunicationType.EXTRACT; int availableThreads = nodeCommunicationService.getAvailableThreads(TYPE); NodeCommunication lock = nodeCommunicationService.find(nodeId, queue, TYPE); if (availableThreads > 0) { nodeCommunicationService.execute(lock, statuses, this); } } public List<NodeQueuePair> getExtractRequestNodes() { return sqlTemplate.query(getSql("selectNodeIdsForExtractSql"), new NodeQueuePairMapper(), ExtractStatus.NE.name()); } private static class NodeQueuePair { private String nodeId; private String queue; public String getNodeId() { return nodeId; } public void setNodeId(String nodeId) { this.nodeId = nodeId; } public String getQueue() { return queue; } public void setQueue(String queue) { this.queue = queue; } } static class NodeQueuePairMapper implements ISqlRowMapper<NodeQueuePair> { @Override public NodeQueuePair mapRow(Row row) { NodeQueuePair pair = new NodeQueuePair(); pair.setNodeId(row.getString("node_id")); pair.setQueue(row.getString("queue")); return pair; } } protected List<ExtractRequest> getExtractRequestsForNode(NodeCommunication nodeCommunication) { return sqlTemplate.query(getSql("selectExtractRequestForNodeSql"), new ExtractRequestMapper(), nodeCommunication.getNodeId(), nodeCommunication.getQueue(), ExtractRequest.ExtractStatus.NE.name()); } protected ExtractRequest getExtractRequestForBatch(OutgoingBatch batch) { return sqlTemplate.queryForObject(getSql("selectExtractRequestForBatchSql"), new ExtractRequestMapper(), batch.getBatchId(), batch.getBatchId(), batch.getNodeId(), batch.getLoadId()); } protected Map<Long, List<ExtractRequest>> getExtractChildRequestsForNode(NodeCommunication nodeCommunication, List<ExtractRequest> parentRequests) { Map<Long, List<ExtractRequest>> requests = new HashMap<Long, List<ExtractRequest>>(); List<ExtractRequest> childRequests = sqlTemplate.query(getSql("selectExtractChildRequestForNodeSql"), new ExtractRequestMapper(), nodeCommunication.getNodeId(), nodeCommunication.getQueue(), ExtractRequest.ExtractStatus.NE.name()); for (ExtractRequest childRequest: childRequests) { List<ExtractRequest> childList = requests.get(childRequest.getParentRequestId()); if (childList == null) { childList = new ArrayList<ExtractRequest>(); requests.put(childRequest.getParentRequestId(), childList); } childList.add(childRequest); } return requests; } protected List<ExtractRequest> getExtractChildRequestsForNode(ExtractRequest parentRequest) { return sqlTemplate.query(getSql("selectExtractChildRequestsByParentSql"), new ExtractRequestMapper(), parentRequest.getRequestId()); } @Override public void resetExtractRequest(OutgoingBatch batch) { ExtractRequest request = getExtractRequestForBatch(batch); if (request != null) { List<ProcessInfo> infos = statisticManager.getProcessInfos(); for (ProcessInfo info : infos) { if (info.getProcessType().equals(ProcessType.INITIAL_LOAD_EXTRACT_JOB) && request.getNodeId().equals(info.getTargetNodeId()) && info.getCurrentBatchId() >= request.getStartBatchId() && info.getCurrentBatchId() <= request.getEndBatchId()) { log.info("Sending interrupt to " + info.getKey().toString()); info.getThread().interrupt(); } } List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatchRange(request.getStartBatchId(), request.getEndBatchId()).getBatches(); List<ExtractRequest> childRequests = null; if (request.getParentRequestId() == 0) { childRequests = getExtractChildRequestsForNode(request); } restartExtractRequest(batches, request, childRequests); } else { log.warn("Unable to find extract request for node {} batch {} load {}", batch.getNodeId(), batch.getBatchId(), batch.getLoadId()); } } public ExtractRequest requestExtractRequest(ISqlTransaction transaction, String nodeId, String queue, TriggerRouter triggerRouter, long startBatchId, long endBatchId, long loadId, String table, long rows, long parentRequestId) { long requestId = 0; if (platform.supportsMultiThreadedTransactions()) { requestId = sequenceService.nextVal(Constants.SEQUENCE_EXTRACT_REQ); } else { requestId = sequenceService.nextVal(transaction, Constants.SEQUENCE_EXTRACT_REQ); } transaction.prepareAndExecute(getSql("insertExtractRequestSql"), new Object[] { requestId, nodeId, queue, ExtractStatus.NE.name(), startBatchId, endBatchId, triggerRouter.getTrigger().getTriggerId(), triggerRouter.getRouter().getRouterId(), loadId, table, rows, parentRequestId }, new int[] { Types.BIGINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.BIGINT, Types.BIGINT }); ExtractRequest request = new ExtractRequest(); request.setRequestId(requestId); request.setNodeId(nodeId); request.setQueue(queue); request.setStatus(ExtractStatus.NE); request.setStartBatchId(startBatchId); request.setEndBatchId(endBatchId); request.setRouterId(triggerRouter.getRouterId()); request.setLoadId(loadId); request.setTableName(table); request.setRows(rows); request.setParentRequestId(parentRequestId); return request; } protected void updateExtractRequestStatus(ISqlTransaction transaction, long extractId, ExtractStatus status, long extractedRows, long extractedMillis) { transaction.prepareAndExecute(getSql("updateExtractRequestStatus"), status.name(), extractedRows, extractedMillis, extractId); } protected boolean canProcessExtractRequest(ExtractRequest request, CommunicationType communicationType) { return !request.getTableName().equalsIgnoreCase(TableConstants.getTableName(tablePrefix, TableConstants.SYM_FILE_SNAPSHOT)); } /** * This is a callback method used by the NodeCommunicationService that extracts an initial load * in the background. */ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status) { if (!isApplicable(nodeCommunication)) { log.debug("{} failed isApplicable check and will not run.", this); return; } List<ExtractRequest> requests = getExtractRequestsForNode(nodeCommunication); Map<Long, List<ExtractRequest>> allChildRequests = null; long ts = System.currentTimeMillis(); if (requests.size() > 0) { allChildRequests = getExtractChildRequestsForNode(nodeCommunication, requests); } /* * Process extract requests until it has taken longer than 30 seconds, and then * allow the process to return so progress status can be seen. */ for (int i = 0; i < requests.size() && (System.currentTimeMillis() - ts) <= Constants.LONG_OPERATION_THRESHOLD; i++) { ExtractRequest request = requests.get(i); if (!canProcessExtractRequest(request, nodeCommunication.getCommunicationType())){ continue; } Node identity = nodeService.findIdentity(); Node targetNode = nodeService.findNode(nodeCommunication.getNodeId(), true); log.info("Starting request {} to extract table {} into batches {} through {} for node {}.", new Object[] { request.getRequestId(), request.getTableName(), request.getStartBatchId(), request.getEndBatchId(), request.getNodeId() }); List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatchRange(request.getStartBatchId(), request.getEndBatchId()).getBatches(); ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(identity .getNodeId(), nodeCommunication.getQueue(), nodeCommunication.getNodeId(), getProcessType())); processInfo.setTotalBatchCount(batches.size()); List<ExtractRequest> childRequests = allChildRequests.get(request.getRequestId()); try { boolean isCanceled = true; boolean isRestarted = false; for (OutgoingBatch outgoingBatch : batches) { if (outgoingBatch.getStatus() != Status.OK && outgoingBatch.getStatus() != Status.IG) { isCanceled = false; } if (outgoingBatch.getStatus() != Status.RQ) { isRestarted = true; } } if (!isCanceled) { Channel channel = configurationService.getChannel(batches.get(0).getChannelId()); /* * "Trick" the extractor to extract one reload batch, but we * will split it across the N batches when writing it */ OutgoingBatch firstBatch = batches.get(0); processInfo.setCurrentLoadId(firstBatch.getLoadId()); processInfo.setStatus(ProcessStatus.QUERYING); if (isRestarted) { restartExtractRequest(batches, request, childRequests); } MultiBatchStagingWriter multiBatchStagingWriter = buildMultiBatchStagingWriter(request, childRequests, identity, targetNode, batches, processInfo, channel, isRestarted); extractOutgoingBatch(processInfo, targetNode, multiBatchStagingWriter, firstBatch, false, false, ExtractMode.FOR_SYM_CLIENT, new ClusterLockRefreshListener(clusterService)); checkSendDeferredConstraints(request, childRequests, targetNode); } else { log.info("Batches already had an OK status for request {} to extract table {} for batches {} through {} for node {}. Not extracting.", new Object[] { request.getRequestId(), request.getTableName(), request.getStartBatchId(), request.getEndBatchId(), request.getNodeId() }); } ISqlTransaction transaction = null; try { transaction = sqlTemplate.startSqlTransaction(); long extractMillis = new Date().getTime() - processInfo.getStartTime().getTime(); updateExtractRequestStatus(transaction, request.getRequestId(), ExtractStatus.OK, processInfo.getCurrentDataCount(), extractMillis); if (childRequests != null) { for (ExtractRequest childRequest : childRequests) { updateExtractRequestStatus(transaction, childRequest.getRequestId(), ExtractStatus.OK, processInfo.getCurrentDataCount(), extractMillis); } } transaction.commit(); log.info("Done with request {} to extract table {} into batches {} through {} for node {}", request.getRequestId(), request.getTableName(), request.getStartBatchId(), request.getEndBatchId(), request.getNodeId()); } catch (Error ex) { if (transaction != null) { transaction.rollback(); } throw ex; } catch (RuntimeException ex) { if (transaction != null) { transaction.rollback(); } throw ex; } finally { close(transaction); } releaseMissedExtractRequests(); processInfo.setStatus(ProcessInfo.ProcessStatus.OK); } catch (CancellationException ex) { log.info("Interrupted on request {} to extract table {} for batches {} through {} for node {}", new Object[] { request.getRequestId(), request.getTableName(), request.getStartBatchId(), request.getEndBatchId(), request.getNodeId() }); processInfo.setStatus(ProcessInfo.ProcessStatus.OK); } catch (RuntimeException ex) { log.warn("Failed on request {} to extract table {} into batches {} through {} for node {}", new Object[] { request.getRequestId(), request.getTableName(), request.getStartBatchId(), request.getEndBatchId(), request.getNodeId() }); processInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); if (ex instanceof StagingLowFreeSpace) { log.error("Extract load is disabled because disk is almost full: {}", ex.getMessage()); break; } else { throw ex; } } } } protected void restartExtractRequest(List<OutgoingBatch> batches, ExtractRequest request, List<ExtractRequest> childRequests) { /* * This extract request was interrupted and must start over */ log.info("Resetting status of request {} to extract table {} into batches {} through {} for node {}", request.getRequestId(), request.getTableName(), request.getStartBatchId(), request.getEndBatchId(), request.getNodeId()); long batchLoadedCount = 0; if (request.getLastLoadedBatchId() > 0) { batchLoadedCount = request.getLastLoadedBatchId() - request.getStartBatchId() + 1; } long rowLoadedCount = request.getLoadedRows(); List<ExtractRequest> allRequests = new ArrayList<ExtractRequest>(); allRequests.add(request); if (childRequests != null) { allRequests.addAll(childRequests); } ISqlTransaction transaction = null; try { transaction = sqlTemplate.startSqlTransaction(); for (ExtractRequest extractRequest : allRequests) { // reset statistics for extract request transaction.prepareAndExecute(getSql("restartExtractRequest"), ExtractStatus.NE.name(), extractRequest.getRequestId(), extractRequest.getNodeId()); // back out statistics from table reload request if (batchLoadedCount > 0 || rowLoadedCount > 0) { dataService.updateTableReloadStatusDataLoaded(transaction, extractRequest.getLoadId(), extractRequest.getStartBatchId(), (int) batchLoadedCount * -1); } // set status of batches back to requested outgoingBatchService.updateOutgoingBatchStatus(transaction, Status.RQ, extractRequest.getNodeId(), extractRequest.getStartBatchId(), extractRequest.getEndBatchId()); } transaction.commit(); } catch (Error ex) { if (transaction != null) { transaction.rollback(); } throw ex; } catch (RuntimeException ex) { if (transaction != null) { transaction.rollback(); } throw ex; } finally { close(transaction); } // remove the batches from staging for (OutgoingBatch outgoingBatch : batches) { IStagedResource resource = getStagedResource(outgoingBatch); if (resource != null) { resource.delete(); } if (childRequests != null) { long batchIndex = outgoingBatch.getBatchId() - request.getStartBatchId(); for (ExtractRequest extractRequest : childRequests) { OutgoingBatch childBatch = new OutgoingBatch(extractRequest.getNodeId(), outgoingBatch.getChannelId(), outgoingBatch.getStatus()); childBatch.setBatchId(outgoingBatch.getBatchId() + batchIndex); resource = getStagedResource(childBatch); if (resource != null) { resource.delete(); } } } } // clear the incoming batch table for the batches at the target node, so the batches won't be skipped String symIncomingBatch = TableConstants.getTableName(parameterService.getTablePrefix(), TableConstants.SYM_INCOMING_BATCH); String nodeIdentityId = nodeService.findIdentityNodeId(); for (ExtractRequest extractRequest : allRequests) { String sql = "delete from " + symIncomingBatch + " where node_id = '" + nodeIdentityId + "' and batch_id between " + extractRequest.getStartBatchId() + " and " + extractRequest.getEndBatchId(); dataService.sendSQL(extractRequest.getNodeId(), sql); } for (ExtractRequest extractRequest : allRequests) { TableReloadStatus reloadStatus = dataService.getTableReloadStatusByLoadId(extractRequest.getLoadId()); OutgoingBatches setupBatches = outgoingBatchService.getOutgoingBatchByLoadRangeAndTable(extractRequest.getLoadId(), 1, reloadStatus.getStartDataBatchId() - 1, extractRequest.getTableName().toLowerCase()); // clear incoming batch table for all batches at the target node that were used to setup this load for a specific table (delete, truncate, etc) for (OutgoingBatch batch : setupBatches.getBatches()) { String sql = "delete from " + symIncomingBatch + " where node_id = '" + nodeIdentityId + "' and batch_id = " + batch.getBatchId(); dataService.sendSQL(batch.getNodeId(), sql); // set status of these batches back to new so they are resent batch.setStatus(Status.NE); outgoingBatchService.updateOutgoingBatch(batch); } } } public void releaseMissedExtractRequests() { List<Long> requestIds = sqlTemplateDirty.query(getSql("selectExtractChildRequestIdsMissed"), new LongMapper(), Status.NE.name(), Status.OK.name()); if (requestIds != null && requestIds.size() > 0) { log.info("Releasing {} child extract requests that missed processing by parent node", requestIds.size()); for (Long requestId : requestIds) { sqlTemplate.update(getSql("releaseExtractChildRequestFromParent"), requestId); } } } protected void checkSendDeferredConstraints(ExtractRequest request, List<ExtractRequest> childRequests, Node targetNode) { if (parameterService.is(ParameterConstants.INITIAL_LOAD_DEFER_CREATE_CONSTRAINTS, false)) { TableReloadRequest reloadRequest = dataService.getTableReloadRequest(request.getLoadId(), request.getTriggerId(), request.getRouterId()); if ((reloadRequest != null && reloadRequest.isCreateTable()) || (reloadRequest == null && parameterService.is(ParameterConstants.INITIAL_LOAD_CREATE_SCHEMA_BEFORE_RELOAD))) { boolean success = false; Trigger trigger = triggerRouterService.getTriggerById(request.getTriggerId()); if (trigger != null) { List<TriggerHistory> histories = triggerRouterService.getActiveTriggerHistories(triggerRouterService.getTriggerById(request.getTriggerId())); if (histories != null && histories.size() > 0) { for (TriggerHistory history : histories) { Channel channel = configurationService.getChannel(trigger.getReloadChannelId()); if (!channel.isFileSyncFlag() && history.getSourceTableName().equalsIgnoreCase(request.getTableName())) { Data data = new Data(history.getSourceTableName(), DataEventType.CREATE, null, String.valueOf(request.getLoadId()), history, trigger.getChannelId(), null, null); data.setNodeList(targetNode.getNodeId()); dataService.insertData(data); if (childRequests != null) { for (ExtractRequest childRequest : childRequests) { data = new Data(history.getSourceTableName(), DataEventType.CREATE, null, String.valueOf(childRequest.getLoadId()), history, trigger.getChannelId(), null, null); data.setNodeList(childRequest.getNodeId()); dataService.insertData(data); } } } } success = true; } } if (!success) { log.warn("Unable to send deferred constraints for trigger '{}' router '{}' in load {}", reloadRequest.getTriggerId(), reloadRequest.getRouterId(), reloadRequest.getLoadId()); } } } } protected boolean isApplicable(NodeCommunication nodeCommunication) { return nodeCommunication.getCommunicationType() != CommunicationType.FILE_XTRCT; } protected MultiBatchStagingWriter buildMultiBatchStagingWriter(ExtractRequest request, List<ExtractRequest> childRequests, Node sourceNode, Node targetNode, List<OutgoingBatch> batches, ProcessInfo processInfo, Channel channel, boolean isRestarted) { MultiBatchStagingWriter multiBatchStatingWriter = new MultiBatchStagingWriter(this, request, childRequests, sourceNode.getNodeId(), stagingManager, batches, channel.getMaxBatchSize(), processInfo, isRestarted); return multiBatchStatingWriter; } protected ProcessType getProcessType() { return ProcessType.INITIAL_LOAD_EXTRACT_JOB; } protected boolean hasLobsThatNeedExtract(Table table, CsvData data) { if (table.containsLobColumns(platform)) { String[] colNames = table.getColumnNames(); Map<String, String> colMap = data.toColumnNameValuePairs(colNames, CsvData.ROW_DATA); List<Column> lobColumns = table.getLobColumns(platform); for (Column c : lobColumns) { String value = colMap.get(c.getName()); if (value != null && (value.equals("\b") || value.equals("08"))) { return true; } } } return false; } class ExtractRequestMapper implements ISqlRowMapper<ExtractRequest> { public ExtractRequest mapRow(Row row) { ExtractRequest request = new ExtractRequest(); request.setNodeId(row.getString("node_id")); request.setRequestId(row.getLong("request_id")); request.setStartBatchId(row.getLong("start_batch_id")); request.setEndBatchId(row.getLong("end_batch_id")); request.setStatus(ExtractStatus.valueOf(row.getString("status").toUpperCase())); request.setCreateTime(row.getDateTime("create_time")); request.setLastUpdateTime(row.getDateTime("last_update_time")); request.setTriggerId(row.getString("trigger_id")); request.setRouterId(row.getString("router_id")); request.setTriggerRouter(triggerRouterService.findTriggerRouterById( row.getString("trigger_id"), row.getString("router_id"), false)); request.setQueue(row.getString("queue")); request.setLoadId(row.getLong("load_id")); request.setTableName(row.getString("table_name")); request.setRows(row.getLong("total_rows")); request.setTransferredRows(row.getLong("transferred_rows")); request.setLastTransferredBatchId(row.getLong("last_transferred_batch_id")); request.setLoadedRows(row.getLong("loaded_rows")); request.setLastLoadedBatchId(row.getLong("last_loaded_batch_id")); request.setTransferredMillis(row.getLong("transferred_millis")); request.setLoadedMillis(row.getLong("loaded_millis")); request.setParentRequestId(row.getLong("parent_request_id")); request.setExtractedRows(row.getLong("extracted_rows")); request.setExtractedMillis(row.getLong("extracted_millis")); return request; } } class ColumnsAccordingToTriggerHistory { class CacheKey{ private String routerId; private int triggerHistoryId; private boolean setTargetTableName; private boolean useDatabaseDefinition; private boolean addMissingColumns; public CacheKey(String routerId, int triggerHistoryId, boolean setTargetTableName, boolean useDatabaseDefinition, boolean addMissingColumns) { this.routerId = routerId; this.triggerHistoryId = triggerHistoryId; this.setTargetTableName = setTargetTableName; this.useDatabaseDefinition = useDatabaseDefinition; this.addMissingColumns = addMissingColumns; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((routerId == null) ? 0 : routerId.hashCode()); result = prime * result + (setTargetTableName ? 1231 : 1237); result = prime * result + triggerHistoryId; result = prime * result + (useDatabaseDefinition ? 1231 : 1237); result = prime * result + (addMissingColumns ? 1231 : 1237); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; CacheKey other = (CacheKey) obj; if (routerId == null) { if (other.routerId != null) return false; } else if (!routerId.equals(other.routerId)) return false; if (setTargetTableName != other.setTargetTableName) return false; if (triggerHistoryId != other.triggerHistoryId) return false; if (useDatabaseDefinition != other.useDatabaseDefinition) return false; if (addMissingColumns != other.addMissingColumns) return false; return true; } } Map<CacheKey, Table> cache = new HashMap<CacheKey, Table>(); Node sourceNode; Node targetNode; public ColumnsAccordingToTriggerHistory(Node sourceNode, Node targetNode) { this.sourceNode = sourceNode; this.targetNode = targetNode; } public Table lookup(String routerId, TriggerHistory triggerHistory, boolean setTargetTableName, boolean useDatabaseDefinition, boolean addMissingColumns) { CacheKey key = new CacheKey(routerId, triggerHistory.getTriggerHistoryId(), setTargetTableName, useDatabaseDefinition, addMissingColumns); Table table = cache.get(key); if (table == null) { table = lookupAndOrderColumnsAccordingToTriggerHistory(routerId, triggerHistory, sourceNode, targetNode, setTargetTableName, useDatabaseDefinition, addMissingColumns); cache.put(key, table); } return table; } } class SelectFromSymDataSource implements IExtractDataReaderSource { private Batch batch; private OutgoingBatch outgoingBatch; private Table targetTable; private Table sourceTable; private TriggerHistory lastTriggerHistory; private String lastRouterId; private DataEventType lastDataEventType; private boolean requiresLobSelectedFromSource; private ISqlReadCursor<Data> cursor; private SelectFromTableSource reloadSource; private Node targetNode; private ProcessInfo processInfo; private ColumnsAccordingToTriggerHistory columnsAccordingToTriggerHistory; private Map<Integer, TriggerRouter> triggerRoutersByTriggerHist; private Map<Integer, CounterStat> missingTriggerRoutersByTriggerHist = new HashMap<Integer, CounterStat>(); private boolean containsBigLob; public SelectFromSymDataSource(OutgoingBatch outgoingBatch, Node sourceNode, Node targetNode, ProcessInfo processInfo, boolean containsBigLob) { this.processInfo = processInfo; this.outgoingBatch = outgoingBatch; this.batch = new Batch(BatchType.EXTRACT, outgoingBatch.getBatchId(), outgoingBatch.getChannelId(), symmetricDialect.getBinaryEncoding(), sourceNode.getNodeId(), outgoingBatch.getNodeId(), outgoingBatch.isCommonFlag()); this.targetNode = targetNode; this.columnsAccordingToTriggerHistory = new ColumnsAccordingToTriggerHistory(sourceNode, targetNode); this.outgoingBatch.resetExtractRowStats(); this.containsBigLob = containsBigLob; this.triggerRoutersByTriggerHist = triggerRouterService.getTriggerRoutersByTriggerHist(targetNode.getNodeGroupId(), false); } public SelectFromSymDataSource(OutgoingBatch outgoingBatch, Node sourceNode, Node targetNode, ProcessInfo processInfo) { this(outgoingBatch, sourceNode, targetNode, processInfo, configurationService.getNodeChannel(outgoingBatch.getChannelId(), false).getChannel().isContainsBigLob()); } public Batch getBatch() { return batch; } public Table getSourceTable() { return sourceTable; } public Table getTargetTable() { return targetTable; } public CsvData next() { if (this.cursor == null) { this.cursor = dataService.selectDataFor(batch.getBatchId(), batch.getTargetNodeId(), containsBigLob); } Data data = null; if (reloadSource != null) { data = (Data) reloadSource.next(); targetTable = reloadSource.getTargetTable(); sourceTable = reloadSource.getSourceTable(); if (data == null) { reloadSource.close(); reloadSource = null; } else { this.requiresLobSelectedFromSource = this.reloadSource.requiresLobsSelectedFromSource(data); } lastTriggerHistory = null; } if (data == null) { data = this.cursor.next(); if (data != null) { TriggerHistory triggerHistory = data.getTriggerHistory(); TriggerRouter triggerRouter = triggerRoutersByTriggerHist.get(triggerHistory.getTriggerHistoryId()); if (triggerRouter == null) { CounterStat counterStat = missingTriggerRoutersByTriggerHist.get(triggerHistory.getTriggerHistoryId()); if (counterStat == null) { triggerRouter = triggerRouterService.getTriggerRouterByTriggerHist(targetNode.getNodeGroupId(), triggerHistory.getTriggerHistoryId(), true); if (triggerRouter == null) { counterStat = new CounterStat(data.getDataId(), 1); missingTriggerRoutersByTriggerHist.put(triggerHistory.getTriggerHistoryId(), counterStat); return next(); } } else { counterStat.incrementCount(); return next(); } triggerRoutersByTriggerHist.put(triggerHistory.getTriggerHistoryId(), triggerRouter); } String routerId = triggerRouter.getRouterId(); if (data.getDataEventType() == DataEventType.RELOAD) { processInfo.setCurrentTableName(triggerHistory.getSourceTableName()); String initialLoadSelect = data.getRowData(); if (initialLoadSelect == null && triggerRouter.getTrigger().isStreamRow()) { sourceTable = columnsAccordingToTriggerHistory.lookup(triggerRouter .getRouter().getRouterId(), triggerHistory, false, true, false); Column[] columns = sourceTable.getPrimaryKeyColumns(); String[] pkData = data.getParsedData(CsvData.PK_DATA); boolean[] nullKeyValues = new boolean[columns.length]; for (int i = 0; i < columns.length; i++) { Column column = columns[i]; nullKeyValues[i] = !column.isRequired() && pkData[i] == null; } DmlStatement dmlStmt = platform.createDmlStatement( DmlType.WHERE, sourceTable.getCatalog(), sourceTable.getSchema(), sourceTable.getName(), sourceTable.getPrimaryKeyColumns(), sourceTable.getColumns(), nullKeyValues, null); Row row = new Row(columns.length); for (int i = 0; i < columns.length; i++) { row.put(columns[i].getName(), pkData[i]); } initialLoadSelect = dmlStmt.buildDynamicSql(batch.getBinaryEncoding(), row, false, true, columns); if (initialLoadSelect.endsWith(platform.getDatabaseInfo().getSqlCommandDelimiter())) { initialLoadSelect = initialLoadSelect.substring(0, initialLoadSelect.length() - platform.getDatabaseInfo().getSqlCommandDelimiter().length()); } } SelectFromTableEvent event = new SelectFromTableEvent(targetNode, triggerRouter, triggerHistory, initialLoadSelect); this.reloadSource = new SelectFromTableSource(outgoingBatch, batch, event); data = (Data) this.reloadSource.next(); this.sourceTable = reloadSource.getSourceTable(); this.targetTable = this.reloadSource.getTargetTable(); this.requiresLobSelectedFromSource = this.reloadSource.requiresLobsSelectedFromSource(data); if (data == null) { data = (Data)next(); } } else { Trigger trigger = triggerRouter.getTrigger(); boolean isFileParserRouter = triggerHistory.getTriggerId().equals(AbstractFileParsingRouter.TRIGGER_ID_FILE_PARSER); if (lastTriggerHistory == null || lastTriggerHistory.getTriggerHistoryId() != triggerHistory.getTriggerHistoryId() || lastRouterId == null || !lastRouterId.equals(routerId)) { this.sourceTable = columnsAccordingToTriggerHistory.lookup( routerId, triggerHistory, false, !isFileParserRouter, true); this.targetTable = columnsAccordingToTriggerHistory.lookup( routerId, triggerHistory, true, false, true); if (trigger != null && trigger.isUseStreamLobs() || (data.getRowData() != null && hasLobsThatNeedExtract(sourceTable, data))) { this.requiresLobSelectedFromSource = true; } else { this.requiresLobSelectedFromSource = false; } }else { //这里的如果第一次是删除,然后新增, 那么新增的requiresLobSelectedFromSource 就按照删除的来,这样是不对的,需要判断一下 boolean firstDeleteAndSameTrigger = lastDataEventType != null && DataEventType.DELETE == lastDataEventType && DataEventType.DELETE != data.getDataEventType() && lastTriggerHistory != null && lastTriggerHistory.getTriggerHistoryId() == triggerHistory.getTriggerHistoryId(); //删除也同理 先新增再删除也判断下,这样提升获取数据性能 boolean firstNotDeleteAndSameTrigger = lastDataEventType != null && DataEventType.DELETE != lastDataEventType && DataEventType.DELETE == data.getDataEventType() && lastTriggerHistory != null && lastTriggerHistory.getTriggerHistoryId() == triggerHistory.getTriggerHistoryId(); if(firstDeleteAndSameTrigger||firstNotDeleteAndSameTrigger){ if (trigger != null && trigger.isUseStreamLobs() || (data.getRowData() != null && hasLobsThatNeedExtract(sourceTable, data))) { this.requiresLobSelectedFromSource = true; } else { this.requiresLobSelectedFromSource = false; } } } data.setNoBinaryOldData(requiresLobSelectedFromSource || symmetricDialect.getName().equals( DatabaseNamesConstants.MSSQL2000) || symmetricDialect.getName().equals( DatabaseNamesConstants.MSSQL2005) || symmetricDialect.getName().equals( DatabaseNamesConstants.MSSQL2008) || symmetricDialect.getName().equals( DatabaseNamesConstants.MSSQL2016)); outgoingBatch.incrementExtractRowCount(); outgoingBatch.incrementExtractRowCount(data.getDataEventType()); if (data.getDataEventType().equals(DataEventType.INSERT) || data.getDataEventType().equals(DataEventType.UPDATE)) { int expectedCommaCount = triggerHistory.getParsedColumnNames().length; int commaCount = StringUtils.countMatches(data.getRowData(), ",") + 1; if (commaCount < expectedCommaCount) { String message = "The extracted row for table %s had %d columns but expected %d. "; if (containsBigLob) { message += "Corrupted row for data ID " + data.getDataId() + ": " + data.getRowData(); } else { message += "If this happens often, it might be better to isolate the table with sym_channel.contains_big_lobs enabled."; } throw new ProtocolException(message, data.getTableName(), commaCount, expectedCommaCount); } } if (data.getDataEventType() == DataEventType.CREATE && StringUtils.isBlank(data.getCsvData(CsvData.ROW_DATA))) { String oldData = data.getCsvData(CsvData.OLD_DATA); boolean sendSchemaExcludeIndices = false; boolean sendSchemaExcludeForeignKeys = false; boolean sendSchemaExcludeDefaults = false; if (oldData != null && oldData.length() > 0) { String[] excludes = data.getCsvData(CsvData.OLD_DATA).split(","); for(String exclude : excludes) { if (Constants.SEND_SCHEMA_EXCLUDE_INDICES.equals(exclude)) { sendSchemaExcludeIndices = true; } else if (Constants.SEND_SCHEMA_EXCLUDE_FOREIGN_KEYS.equals(exclude)) { sendSchemaExcludeForeignKeys = true; } else if (Constants.SEND_SCHEMA_EXCLUDE_DEFAULTS.equals(exclude)) { sendSchemaExcludeDefaults = true; } } } boolean excludeDefaults = parameterService.is(ParameterConstants.CREATE_TABLE_WITHOUT_DEFAULTS, false) | sendSchemaExcludeDefaults; boolean excludeForeignKeys = parameterService.is(ParameterConstants.CREATE_TABLE_WITHOUT_FOREIGN_KEYS, false) | sendSchemaExcludeForeignKeys; boolean excludeIndexes = parameterService.is(ParameterConstants.CREATE_TABLE_WITHOUT_INDEXES, false) | sendSchemaExcludeIndices; boolean deferConstraints = outgoingBatch.isLoadFlag() && parameterService.is(ParameterConstants.INITIAL_LOAD_DEFER_CREATE_CONSTRAINTS, false); String[] pkData = data.getParsedData(CsvData.PK_DATA); if (pkData != null && pkData.length > 0) { outgoingBatch.setLoadId(Long.parseLong(pkData[0])); TableReloadStatus tableReloadStatus = dataService.getTableReloadStatusByLoadId(outgoingBatch.getLoadId()); if (tableReloadStatus != null && tableReloadStatus.isCompleted()) { // Ignore create table (indexes and foreign keys) at end of load if it was cancelled return null; } } /* * Force a reread of table so new columns are picked up. A create * event is usually sent after there is a change to the table so * we want to make sure that the cache is updated */ this.sourceTable = getTargetPlatform().getTableFromCache(sourceTable.getCatalog(), sourceTable.getSchema(), sourceTable.getName(), true); this.targetTable = columnsAccordingToTriggerHistory.lookup( routerId, triggerHistory, true, true, false); Table copyTargetTable = this.targetTable.copy(); Database db = new Database(); db.setName("dataextractor"); db.setCatalog(copyTargetTable.getCatalog()); db.setSchema(copyTargetTable.getSchema()); db.addTable(copyTargetTable); if (excludeDefaults) { Column[] columns = copyTargetTable.getColumns(); for (Column column : columns) { column.setDefaultValue(null); Map<String, PlatformColumn> platformColumns = column.getPlatformColumns(); if (platformColumns != null) { Collection<PlatformColumn> cols = platformColumns.values(); for (PlatformColumn platformColumn : cols) { platformColumn.setDefaultValue(null); } } } } if (excludeForeignKeys || deferConstraints) { copyTargetTable.removeAllForeignKeys(); } if (excludeIndexes || deferConstraints) { copyTargetTable.removeAllIndexes(); } if (parameterService.is(ParameterConstants.CREATE_TABLE_WITHOUT_PK_IF_SOURCE_WITHOUT_PK, false) && sourceTable.getPrimaryKeyColumnCount() == 0 && copyTargetTable.getPrimaryKeyColumnCount() > 0) { for (Column column : copyTargetTable.getColumns()) { column.setPrimaryKey(false); } } if (parameterService.is(ParameterConstants.MYSQL_TINYINT_DDL_TO_BOOLEAN, false)) { for (Column column : copyTargetTable.getColumns()) { if (column.getJdbcTypeCode() == Types.TINYINT) { column.setJdbcTypeCode(Types.BOOLEAN); column.setMappedTypeCode(Types.BOOLEAN); } } } data.setRowData(CsvUtils.escapeCsvData(DatabaseXmlUtil.toXml(db))); } } if (data != null) { lastTriggerHistory = data.getTriggerHistory(); lastRouterId = routerId; lastDataEventType = data.getDataEventType(); } } else { closeCursor(); } } return data; } public boolean requiresLobsSelectedFromSource(CsvData data) { return requiresLobSelectedFromSource; } protected void closeCursor() { if (this.cursor != null) { this.cursor.close(); this.cursor = null; } } public void close() { closeCursor(); if (reloadSource != null) { reloadSource.close(); } for (Map.Entry<Integer, CounterStat> entry : missingTriggerRoutersByTriggerHist.entrySet()) { log.warn("Could not find trigger router for trigger hist of {}. Skipped {} events starting with data id of {}", entry.getKey(), entry.getValue().getCount(), entry.getValue().getObject()); } } } class SelectFromTableSource implements IExtractDataReaderSource { private OutgoingBatch outgoingBatch; private Batch batch; private Table targetTable; private Table sourceTable; private List<SelectFromTableEvent> selectFromTableEventsToSend; private SelectFromTableEvent currentInitialLoadEvent; private ISqlReadCursor<Data> cursor; private SimpleRouterContext routingContext; private Node node; private Set<Node> nodeSet; private TriggerRouter triggerRouter; private Map<String, IDataRouter> routers; private IDataRouter dataRouter; private ColumnsAccordingToTriggerHistory columnsAccordingToTriggerHistory; private String overrideSelectSql; private boolean initialLoadSelectUsed; private boolean isSelfReferencingFk; private int selfRefLevel; private String selfRefParentColumnName; private String selfRefChildColumnName; boolean isFirstRow; boolean isLobFirstPass; boolean isConfiguration; public SelectFromTableSource(OutgoingBatch outgoingBatch, Batch batch, SelectFromTableEvent event) { this.outgoingBatch = outgoingBatch; List<SelectFromTableEvent> initialLoadEvents = new ArrayList<DataExtractorService.SelectFromTableEvent>( 1); initialLoadEvents.add(event); this.outgoingBatch.resetExtractRowStats(); this.init(batch, initialLoadEvents); } public SelectFromTableSource(Batch batch, List<SelectFromTableEvent> initialLoadEvents) { this.init(batch, initialLoadEvents); } protected void init(Batch batch, List<SelectFromTableEvent> initialLoadEvents) { this.selectFromTableEventsToSend = new ArrayList<SelectFromTableEvent>( initialLoadEvents); this.batch = batch; this.node = nodeService.findNode(batch.getTargetNodeId(), true); this.nodeSet = new HashSet<Node>(1); this.nodeSet.add(node); this.routers = routerService.getRouters(); if (node == null) { throw new SymmetricException("Could not find a node represented by %s", this.batch.getTargetNodeId()); } this.columnsAccordingToTriggerHistory = new ColumnsAccordingToTriggerHistory(nodeService.findIdentity(), node); } public Table getSourceTable() { return sourceTable; } public Batch getBatch() { return batch; } public Table getTargetTable() { return targetTable; } public void setConfiguration(boolean isConfiguration) { this.isConfiguration = isConfiguration; } public CsvData next() { CsvData data = null; do { data = selectNext(); } while (data != null && routingContext != null && !shouldDataBeRouted(data)); if (data != null && outgoingBatch != null && !outgoingBatch.isExtractJobFlag()) { outgoingBatch.incrementExtractRowCount(); outgoingBatch.incrementExtractRowCount(data.getDataEventType()); } return data; } public boolean shouldDataBeRouted(CsvData data) { DataMetaData dataMetaData = new DataMetaData((Data) data, sourceTable, triggerRouter.getRouter(), routingContext.getChannel()); Collection<String> nodeIds = dataRouter.routeToNodes(routingContext, dataMetaData, nodeSet, true, initialLoadSelectUsed, triggerRouter); return nodeIds != null && nodeIds.contains(node.getNodeId()); } protected CsvData selectNext() { CsvData data = null; if (this.currentInitialLoadEvent == null && selectFromTableEventsToSend.size() > 0) { this.currentInitialLoadEvent = selectFromTableEventsToSend.remove(0); TriggerHistory history = this.currentInitialLoadEvent.getTriggerHistory(); this.isSelfReferencingFk = false; this.isFirstRow = true; if (this.currentInitialLoadEvent.containsData()) { data = this.currentInitialLoadEvent.getData(); this.sourceTable = columnsAccordingToTriggerHistory.lookup( currentInitialLoadEvent.getTriggerRouter().getRouterId(), history, false, true, false); this.targetTable = columnsAccordingToTriggerHistory.lookup( currentInitialLoadEvent.getTriggerRouter().getRouterId(), history, true, false, false); this.currentInitialLoadEvent = null; } else { this.triggerRouter = this.currentInitialLoadEvent.getTriggerRouter(); this.initialLoadSelectUsed = this.currentInitialLoadEvent.getInitialLoadSelect() != null && !this.currentInitialLoadEvent.getInitialLoadSelect().equals("1=1") ? true : StringUtils.isNotBlank(this.triggerRouter.getInitialLoadSelect()); Router router = triggerRouter.getRouter(); if (!StringUtils.isBlank(router.getRouterType())) { this.dataRouter = routers.get(router.getRouterType()); } if (dataRouter == null) { this.dataRouter = routers.get("default"); } if (this.routingContext == null) { NodeChannel channel = batch != null ? configurationService.getNodeChannel( batch.getChannelId(), false) : new NodeChannel(this.triggerRouter .getTrigger().getChannelId()); this.routingContext = new SimpleRouterContext(batch.getTargetNodeId(), channel); } this.sourceTable = columnsAccordingToTriggerHistory.lookup(triggerRouter .getRouter().getRouterId(), history, false, true, false); this.targetTable = columnsAccordingToTriggerHistory.lookup(triggerRouter .getRouter().getRouterId(), history, true, false, false); this.overrideSelectSql = currentInitialLoadEvent.getInitialLoadSelect(); if (overrideSelectSql != null && overrideSelectSql.trim().toUpperCase().startsWith("WHERE")) { overrideSelectSql = overrideSelectSql.trim().substring(5); } if (parameterService.is(ParameterConstants.INITIAL_LOAD_RECURSION_SELF_FK)) { ForeignKey fk = this.sourceTable.getSelfReferencingForeignKey(); if (fk != null) { Reference[] refs = fk.getReferences(); if (refs.length == 1) { this.isSelfReferencingFk = true; this.selfRefParentColumnName = refs[0].getLocalColumnName(); this.selfRefChildColumnName = refs[0].getForeignColumnName(); this.selfRefLevel = 0; log.info("Ordering rows for table {} using self-referencing foreign key {} -> {}", this.sourceTable.getName(), this.selfRefParentColumnName, this.selfRefChildColumnName); } else { log.warn("Unable to order rows for self-referencing foreign key because it contains multiple columns"); } } } ISymmetricDialect symmetricDialectToUse = getSymmetricDialect(); if (this.routingContext.getChannel().isReloadFlag() && symmetricDialectToUse.isInitialLoadTwoPassLob(this.sourceTable)) { this.isLobFirstPass = true; } this.startNewCursor(history, triggerRouter); } } if (this.cursor != null) { data = this.cursor.next(); if (data == null) { closeCursor(); ISymmetricDialect symmetricDialectToUse = getSymmetricDialect(); if (isSelfReferencingFk && !this.isFirstRow) { this.selfRefLevel++; this.startNewCursor(this.currentInitialLoadEvent.getTriggerHistory(), triggerRouter); this.isFirstRow = true; } else if (symmetricDialectToUse.isInitialLoadTwoPassLob(this.sourceTable) && this.isLobFirstPass) { this.isLobFirstPass = false; this.startNewCursor(this.currentInitialLoadEvent.getTriggerHistory(), triggerRouter); } else { this.currentInitialLoadEvent = null; } data = next(); } else if (this.isFirstRow) { this.isFirstRow = false; } } return data; } protected void closeCursor() { if (this.cursor != null) { this.cursor.close(); this.cursor = null; } } public ISymmetricDialect getSymmetricDialect() { ISymmetricDialect dialect = null; if (this.isConfiguration || (sourceTable != null && sourceTable.getNameLowerCase().startsWith(parameterService.getTablePrefix().toLowerCase() + "_"))) { dialect = symmetricDialect; } else { dialect = symmetricDialect.getTargetDialect(); } return dialect; } protected void startNewCursor(final TriggerHistory triggerHistory, final TriggerRouter triggerRouter) { ISymmetricDialect symmetricDialectToUse = getSymmetricDialect(); String selectSql = overrideSelectSql; if (isSelfReferencingFk) { if (selectSql == null) { selectSql = ""; } else if (StringUtils.isNotBlank(selectSql)) { selectSql += " and "; } if (selfRefLevel == 0) { selectSql += "(" + selfRefParentColumnName + " is null or " + selfRefParentColumnName + " = " + selfRefChildColumnName + ") "; } else { DatabaseInfo info = symmetricDialectToUse.getPlatform().getDatabaseInfo(); String tableName = Table.getFullyQualifiedTableName(sourceTable.getCatalog(), sourceTable.getSchema(), sourceTable.getName(), info.getDelimiterToken(), info.getCatalogSeparator(), info.getSchemaSeparator()); String refSql= "select " + selfRefChildColumnName + " from " + tableName + " where " + selfRefParentColumnName; selectSql += selfRefParentColumnName + " in ("; for (int i = 1; i < selfRefLevel; i++) { selectSql += refSql + " in ("; } selectSql += refSql + " is null or " + selfRefChildColumnName + " = " + selfRefParentColumnName + " ) and " + selfRefParentColumnName + " != " + selfRefChildColumnName + StringUtils.repeat(")", selfRefLevel - 1); } log.info("Querying level {} for table {}: {}", selfRefLevel, sourceTable.getName(), selectSql); } Channel channel = configurationService.getChannel(triggerRouter.getTrigger().getReloadChannelId()); if (channel.isReloadFlag() && symmetricDialectToUse.isInitialLoadTwoPassLob(this.sourceTable)) { channel = new Channel(); channel.setContainsBigLob(!this.isLobFirstPass); selectSql = symmetricDialectToUse.getInitialLoadTwoPassLobSql(selectSql, this.sourceTable, this.isLobFirstPass); log.info("Querying {} pass LOB for table {}: {}", (this.isLobFirstPass ? "first" : "second"), sourceTable.getName(), selectSql); } String sql = symmetricDialectToUse.createInitialLoadSqlFor( this.currentInitialLoadEvent.getNode(), triggerRouter, sourceTable, triggerHistory, channel, selectSql); for (IReloadVariableFilter filter : extensionService.getExtensionPointList(IReloadVariableFilter.class)) { sql = filter.filterInitalLoadSql(sql, node, targetTable); } final String initialLoadSql = sql; final int expectedCommaCount = triggerHistory.getParsedColumnNames().length - 1; final boolean selectedAsCsv = symmetricDialectToUse.getParameterService().is( ParameterConstants.INITIAL_LOAD_CONCAT_CSV_IN_SQL_ENABLED); final boolean objectValuesWillNeedEscaped = !symmetricDialectToUse.getTriggerTemplate() .useTriggerTemplateForColumnTemplatesDuringInitialLoad(); final boolean[] isColumnPositionUsingTemplate = symmetricDialectToUse.getColumnPositionUsingTemplate(sourceTable, triggerHistory); final boolean checkRowLength = parameterService.is(ParameterConstants.EXTRACT_CHECK_ROW_SIZE, false); final long rowMaxLength = parameterService.getLong(ParameterConstants.EXTRACT_ROW_MAX_LENGTH, 1000000000); log.debug(sql); this.cursor = symmetricDialectToUse.getPlatform().getSqlTemplate().queryForCursor(initialLoadSql, new ISqlRowMapper<Data>() { public Data mapRow(Row row) { if (checkRowLength) { // Account for double byte characters and encoding long rowSize = row.getLength() * 2; if (rowSize > rowMaxLength) { StringBuffer pkValues = new StringBuffer(); int i = 0; Object[] rowValues = row.values().toArray(); for (String name : sourceTable.getPrimaryKeyColumnNames()) { pkValues.append(name).append("=").append(rowValues[i]); i++; } log.warn("Extract row max size exceeded, keys [" + pkValues.toString() + "], size=" + rowSize); Data data = new Data(0, null, "", DataEventType.SQL, triggerHistory .getSourceTableName(), null, triggerHistory, batch.getChannelId(), null, null); return data; } } String csvRow = null; if (selectedAsCsv) { csvRow = row.stringValue(); int commaCount = StringUtils.countMatches(csvRow, ","); if (commaCount < expectedCommaCount) { throw new SymmetricException( "The extracted row data did not have the expected (%d) number of columns (actual=%s): %s. The initial load sql was: %s", expectedCommaCount, commaCount, csvRow, initialLoadSql); } } else if (objectValuesWillNeedEscaped) { csvRow = platform.getCsvStringValue( symmetricDialect.getBinaryEncoding(), sourceTable.getColumns(), row, isColumnPositionUsingTemplate); } else { csvRow = row.csvValue(); } Data data = new Data(0, null, csvRow, DataEventType.INSERT, triggerHistory .getSourceTableName(), null, triggerHistory, batch.getChannelId(), null, null); return data; } }, checkRowLength && sourceTable.containsLobColumns(symmetricDialect.getPlatform()) && !sourceTable.getNameLowerCase().startsWith(symmetricDialect.getTablePrefix())); } public boolean requiresLobsSelectedFromSource(CsvData data) { if (parameterService.is(ParameterConstants.INITIAL_LOAD_USE_COLUMN_TEMPLATES_ENABLED) && this.currentInitialLoadEvent != null && this.currentInitialLoadEvent.getTriggerRouter() != null) { if (this.currentInitialLoadEvent.getTriggerRouter().getTrigger().isUseStreamLobs() || (data != null && hasLobsThatNeedExtract(sourceTable, data))) { return true; } return this.currentInitialLoadEvent.getTriggerRouter().getTrigger().isUseStreamLobs(); } else { return false; } } public void close() { closeCursor(); } } @Override public void removeBatchFromStaging(OutgoingBatch batch) { IStagedResource resource = getStagedResource(batch); if (resource != null) { resource.delete(); } else { log.info("Could not remove batch {} from staging because it did not exist", batch.getNodeBatchId()); } } class SelectFromTableEvent { private TriggerRouter triggerRouter; private TriggerHistory triggerHistory; private Node node; private Data data; private String initialLoadSelect; public SelectFromTableEvent(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory, String initialLoadSelect) { this.node = node; this.triggerRouter = triggerRouter; this.initialLoadSelect = initialLoadSelect; Trigger trigger = triggerRouter.getTrigger(); this.triggerHistory = triggerHistory != null ? triggerHistory : triggerRouterService .getNewestTriggerHistoryForTrigger(trigger.getTriggerId(), trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), trigger.getSourceTableName()); } public SelectFromTableEvent(Data data, TriggerRouter triggerRouter) { this.data = data; this.triggerHistory = data.getTriggerHistory(); this.triggerRouter = triggerRouter; } public TriggerHistory getTriggerHistory() { return triggerHistory; } public TriggerRouter getTriggerRouter() { return triggerRouter; } public Data getData() { return data; } public Node getNode() { return node; } public boolean containsData() { return data != null; } public String getInitialLoadSelect() { return initialLoadSelect; } } static class FutureExtractStatus { boolean shouldExtractSkip; int batchExtractCount; int byteExtractCount; } static class FutureOutgoingBatch { OutgoingBatch outgoingBatch; boolean isRetry; boolean isExtractSkipped; public FutureOutgoingBatch(OutgoingBatch outgoingBatch, boolean isRetry) { this.outgoingBatch = outgoingBatch; this.isRetry = isRetry; } public OutgoingBatch getOutgoingBatch() { return outgoingBatch; } public boolean isRetry() { return isRetry; } } static class BatchLock { public BatchLock(String semaphoreKey) { this.semaphoreKey = semaphoreKey; } public void acquire() throws InterruptedException { inMemoryLock.acquire(); } public void release() { inMemoryLock.release(); } String semaphoreKey; private Semaphore inMemoryLock = new Semaphore(1); StagingFileLock fileLock; int referenceCount = 0; } } |
Date Modified | Username | Field | Change |
---|---|---|---|
2022-09-13 07:39 | qii | New Issue | |
2022-09-13 15:09 | elong | Assigned To | => elong |
2022-09-13 15:09 | elong | Status | new => feedback |
2022-09-13 15:09 | elong | Note Added: 0002193 | |
2022-09-14 09:55 | qii | Note Added: 0002195 | |
2022-09-14 09:55 | qii | Status | feedback => assigned |
2022-09-19 17:39 | elong | Status | assigned => feedback |
2022-09-19 17:39 | elong | Note Added: 0002199 | |
2022-09-21 03:23 | qii | Note Added: 0002201 | |
2022-09-21 03:23 | qii | Status | feedback => assigned |
2024-12-31 18:47 | emiller | Status | assigned => feedback |
2024-12-31 18:47 | emiller | Note Added: 0002569 | |
2024-12-31 18:47 | emiller | Tag Attached: trigger | |
2024-12-31 18:47 | emiller | Tag Attached: large objects | |
2025-01-10 05:43 | qii | Note Added: 0002590 | |
2025-01-10 05:43 | qii | File Added: DataExtractorService.java | |
2025-01-10 05:43 | qii | Status | feedback => assigned |