View Issue Details
ID | Project | Category | View Status | Date Submitted | Last Update |
---|---|---|---|---|---|
0003015 | SymmetricDS | Improvement | public | 2017-03-14 15:41 | 2019-04-24 14:46 |
Reporter | pavan kumar | Assigned To | |||
Priority | low | ||||
Status | closed | Resolution | no change required | ||
Product Version | 3.8.17 | ||||
Summary | 0003015: SymmetricDS with KafkaConsumer. | ||||
Description | We are trying database replication using SymmetricDS with Kafka. We followed below sample to read data from Oracle and push to Kafka. Now we are trying for the code to read from Kafka and insert the records into Postgres. Though we have written custom KafkaConsumer to do the same, we want to try the code using SymmetricDS API in below format. It would be helpful if you can provide us the code for KafkaConsumer with SymmetricDS insert into SYM_EXTENSION (EXTENSION_ID, EXTENSION_TYPE, INTERFACE_NAME, NODE_GROUP_ID, ENABLED, EXTENSION_ORDER, EXTENSION_TEXT, CREATE_TIME, LAST_UPDATE_BY, LAST_UPDATE_TIME) values ('KafkaDataWriter','java','org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter','client',1,1,' import java.io.File; import java.util.HashMap; import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.jumpmind.db.model.Table; import org.jumpmind.symmetric.io.data.CsvData; import org.jumpmind.symmetric.io.data.DataContext; import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaWriterFilter implements IDatabaseWriterFilter { protected final String KAKFA_TEXT_CACHE = "KAKFA_TEXT_CACHE" + this.hashCode(); private final Logger log = LoggerFactory.getLogger(getClass()); public boolean beforeWrite(DataContext context, Table table, CsvData data) { if (table.getName().toUpperCase().startsWith("SYM_")) { return true; } else { log.info("Processing table " + table + " for Kafka"); String[] rowData = data.getParsedData(CsvData.ROW_DATA); if (data.getDataEventType() == DataEventType.DELETE) { rowData = data.getParsedData(CsvData.OLD_DATA); } StringBuffer kafkaText = new StringBuffer(); if (context.get(KAKFA_TEXT_CACHE) != null) { kafkaText = (StringBuffer) context.get(KAKFA_TEXT_CACHE); } boolean useJson = false; if (useJson) { kafkaText.append("{\"") .append(table.getName()) .append("\": {") .append("\"eventType\": \"" + data.getDataEventType() + "\",") .append("\"data\": { "); for (int i = 0; i < table.getColumnNames().length; i++) { kafkaText.append("\"" + table.getColumnNames()[i] + "\": \"" + rowData[i]); if (i + 1 < table.getColumnNames().length) { kafkaText.append("\","); } } kafkaText.append(" } } }"); } else { kafkaText.append("\nTABLE") .append(",") .append(table.getName()) .append(",") .append("EVENT") .append(",") .append(data.getDataEventType()) .append(","); for (int i = 0; i < table.getColumnNames().length; i++) { kafkaText.append(table.getColumnNames()[i]) .append(",") .append(rowData[i]); if (i + 1 < table.getColumnNames().length) { kafkaText.append(","); } } } context.put(KAKFA_TEXT_CACHE, kafkaText); } return false; } public void afterWrite(DataContext context, Table table, CsvData data) { } public boolean handlesMissingTable(DataContext context, Table table) { return true; } public void earlyCommit(DataContext context) { } public void batchComplete(DataContext context) { if (!context.getBatch().getChannelId().equals("heartbeat") && !context.getBatch().getChannelId().equals("config")) { String batchFileName = "batch-" + context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId(); log.info("Processing batch " + batchFileName + " for Kafka"); try { File batchesDir = new File("batches"); if (!batchesDir.exists()) { batchesDir.mkdir(); } File batchFile = new File(batchesDir.getAbsoluteFile() + "/" + batchFileName); if (context.get(KAKFA_TEXT_CACHE) != null) { String kafkaText = ((StringBuffer) context.get(KAKFA_TEXT_CACHE)).toString(); FileUtils.writeStringToFile(batchFile, KAKFA_TEXT_CACHE); sendKafkaMessage(kafkaText); } else { log.info("No text found to write to kafka queue"); } } catch (Exception e) { log.warn("Unable to write batch to Kafka " + batchFileName, e); e.printStackTrace(); } } } public void batchCommitted(DataContext context) { } public void batchRolledback(DataContext context) { } public void sendKafkaMessage(String kafkaText) { Map<String,Object> configs = new HashMap<String, Object>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configs.put(ProducerConfig.CLIENT_ID_CONFIG, "symmetricds-producer"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs); producer.send(new ProducerRecord<String, String>("test", kafkaText)); log.debug("Data to be sent to Kafka-" + kafkaText); producer.close(); } } ',{ts '2017-01-09 10:58:17.981'},'admin',{ts '2017-01-09 13:04:37.490'}); | ||||
Tags | No tags attached. | ||||
related to | 0003609 | closed | josh-a-hicks | Kafka support as a load only node |
Date Modified | Username | Field | Change |
---|---|---|---|
2017-03-14 15:41 | pavan kumar | New Issue | |
2017-03-15 18:48 | chenson | Priority | @60@ => low |
2017-03-15 18:49 | chenson | Assigned To | => chenson |
2017-03-15 18:49 | chenson | Status | new => acknowledged |
2019-04-24 14:46 | elong | Relationship added | related to 0003609 |
2019-04-24 14:46 | elong | Assigned To | chenson => |
2019-04-24 14:46 | elong | Status | acknowledged => closed |
2019-04-24 14:46 | elong | Resolution | open => no change required |
2019-04-24 14:46 | elong | Note Added: 0001429 |