0 / 0

Apache Kafka (DataStage)

Ultimo aggiornamento: 12 mar 2025
Apache Kafka in DataStage

Utilizza il connettore Apache Kafka in DataStage® per scrivere e leggere flussi di eventi da e verso argomenti.

Prerequisito

Crea la connessione. Per istruzioni, consultare Connessione a una origine dati in DataStage e Connessione Apache Kafka.

Lettura dei dati dagli argomenti in Kafka

È possibile configurare il connettore Apache Kafka per leggere i messaggi da un'origine dati Kafka .

Figura 1. Esempio di lettura dei dati dal connettore Apache Kafka
Esempio di lettura dei dati dal connettore Apache Kafka
Configurazione del connettore Apache Kafka come origine
  1. Dall'area di disegno del job, fare doppio clic su Apache Kafka connector.
  2. Nella scheda Stage , fare clic su Proprietà.
  3. Selezionare Utilizza proprietà DataStage.
  4. Immettere i valori per le proprietà richieste:
    • Nome argomento: - argomento Kafka da cui leggere i messaggi.
    • Serializer chiave - Il serializzatore per i tipi di dati chiave in Apache Kafka. Il valore del serializzatore chiavi deve essere compatibile con la colonna chiave (se definita) nel link Output . Se si seleziona un serializer chiave Avro o Avro as JSON , il serializer valore deve essere impostato su Avro o Avro as JSON.
    • Serializer valore - Il serializer per i tipi di dati valore in Apache Kafka. Il valore del serializzatore valore deve essere compatibile con la colonna chiave (se definita) nel link Output .
    • Gruppo consumer - Una stringa che identifica in modo univoco il gruppo di processi consumer a cui appartiene questo consumer. Impostando lo stesso ID gruppo per più processi, si indica che fanno tutti parte dello stesso gruppo di consumatori.
    • Numero massimo di record di polling - Il numero massimo di record restituiti in una singola chiamata di polling.
    • Numero massimo di messaggi - Il numero massimo di messaggi da utilizzare o produrre da o verso l'argomento in base al processo del lettore. Questo valore è un multiplo dei record di polling massimi. Questa impostazione non è attiva se si seleziona Modalità continua.
    • Politica di reimpostazione - Il valore che indica la politica se non vi è alcun offset iniziale nel server Kafka o se l'offset corrente non esiste più sul server. Ad esempio, se i dati vengono eliminati, è possibile utilizzare i valori earliest e latest per reimpostare automaticamente l'offset sul primo dell'ultimo offset.
  5. Configura proprietà facoltative:
    • Modalità continua - Il connettore recupererà i messaggi in una modalità continua attendendo nuovi messaggi in entrata per un tempo indefinito.
      • Arresta messaggio - Il messaggio che arresta l'elaborazione in modalità continua.
    • Transazione - La transazione che invia le impostazioni dell'indicatore "End of wave":
      • Conteggio record - Il numero di record per transazione. Il valore 0 indica tutti i record disponibili.
      • Intervallo di tempo - L'intervallo di tempo per una transazione. Utilizzare questa proprietà se si imposta Conteggio record su 0.
    • End of wave - Invia un indicatore "End of wave" dopo ogni transazione.
    • Fine dei dati - Inserire un indicatore "Fine della movimentazione" per la serie finale di record quando il loro numero è inferiore al valore specificato per il numero di record della transazione. Se il valore Conteggio record della transazione è 0 (tutti i record disponibili), esiste un solo wave di transazione. In questo caso, impostare il valore Fine dei dati su Yes in modo che l'indicatore "Fine del wave" venga inserito per tale wave di transazione.
    • Livello di registrazione - Configura la registrazione del client Kafka . Vedere Controllo dell'output del log.
    • Proprietà di configurazione client - Imposta ulteriori proprietà del client Kafka . Vedere Kafka client configuration properties
  6. Fare clic sulla scheda Output per definire le colonne per il connettore. Le colonne devono essere compatibili con il tipo di dati definito nelle proprietà Serializzatore chiave e Serializzatore valore . Se si utilizza un programma di serializzazione Avro o Avro come JSON , i nomi colonna devono corrispondere ai campi del record Avro. Fare riferimento a Registro schema per i dettagli.
    • value - La colonna contenente i valori del messaggio Kafka . Deve essere compatibile con la proprietà Programma di serializzazione valori .
    • key - la colonna che contiene le chiavi del messaggio Kafka . Deve essere compatibile con la proprietà Serializzatore chiavi .
    • partition - La colonna che contiene il numero di partizione da cui proviene il messaggio Kafka . Deve essere compatibile con il tipo di dati Integer.
    • data/ora - La colonna che contiene la data/ora per il messaggio Kafka . Deve essere compatibile con il tipo di dati Data / ora.
    • offset - la colonna che contiene l'offset per il messaggio Kafka . Deve essere compatibile con il tipo di dati BigInt .

Scrittura dei dati negli argomenti in Kafka

Puoi configurare il connettore Apache Kafka per scrivere i dati in un'origine dati Kafka .

Figura 2. Esempio di scrittura dei dati nell'origine dati Kafka
Esempio di scrittura dei dati nell'origine dati Kafka
Configurazione del connettore Apache Kafka come destinazione
  1. Dall'area di disegno del job, fare doppio clic su Apache Kafka connector.
  2. Nella scheda Stage , fare clic su Proprietà.
  3. Selezionare Utilizza proprietà DataStage.
  4. Immettere i valori per le proprietà richieste:
    • Nome argomento : il nome del topic in cui devono essere scritti i messaggi dallo stage upstream.
    • Serializer chiave - Il serializzatore per i tipi di dati chiave in Apache Kafka. Il valore del serializzatore chiave deve essere compatibile con la colonna chiave (se definita) nel link Input . Se si seleziona un serializer chiave Avro o Avro as JSON , il serializer Valore deve essere impostato su Avro o su Avro as JSON.
    • Serializer valore - Il serializer per i tipi di dati valore in Apache Kafka. Il valore del serializzatore valore deve essere compatibile con la colonna chiave (se definita) nel link Input .
  5. Configura proprietà facoltative:
  6. Fare clic sulla scheda Input per definire le colonne per il connettore. Le colonne devono essere compatibili con il tipo di dati definito nelle proprietà Serializzatore chiave e Serializzatore valore . Se si utilizza un serializzatore Avro o Avro as JSON , i nomi colonna devono corrispondere ai campi del record Avro. Fare riferimento a Registro schema per i dettagli.
    • value - La colonna contenente i valori del messaggio Kafka . Deve essere compatibile con la proprietà Programma di serializzazione valori .
    • key - la colonna che contiene le chiavi del messaggio Kafka . Deve essere compatibile con la proprietà Serializzatore chiavi .

Registro degli schemi

Il registro dello schema ti consente di utilizzare messaggi Kafka più complessi. È possibile utilizzare il registro dello schema per le definizioni di colonna DataStage generali all'interno delle progettazioni del job.

Per default, i messaggi vengono restituiti come un valore all'interno di una singola colonna che definisci nel connettore Apache Kafka . Quando utilizzi i formati Avro e Avro as JSON e il registro dello schema, abiliti la decomposizione dei messaggi Kafka complessi nelle colonne DataStage .

Quando si scrive in un topic, il connettore crea uno schema basato sulle definizioni di colonna.

Quando il connettore legge da un argomento Kafka , prende uno schema dal registro dello schema. In base a tale schema, i campi record Avro vengono estratti in colonne definite come output del connettore.

Ad esempio, quando lo schema Avro è definito come segue:
{
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "salary",
      "type": "double"
    }
  ],
  "name": "KafkaConnectorSchema",
  "type": "record"
}
Nel connettore Apache Kafka , definisci le colonne come segue:
Tabella 1. Nome colonna e tipo di dati
Nome colonna Tipo di dati
id Intero
nome VarChar
stipendio Doppio

Per utilizzare il registro di schema, utilizzare la configurazione seguente:

  • È necessario impostare la proprietà di configurazione Serializzatore chiavi o Serializzatore valori su Avro o su Avro as JSON.
  • Dopo aver impostato il serializer chiave su Avro o Avro as JSON, non è possibile configurare il serializer valore su un valore diverso da Avro o Avro as JSON.
  • Quando si imposta la proprietà di configurazione Serializer chiave su Avro, la definizione della colonna per il link deve essere conforme alla definizione dello schema Avro.
  • Le colonne chiave devono essere selezionate con la casella di controllo Chiave oppure i loro nomi devono iniziare con un prefisso key_ . Utilizzare solo un metodo. Non è possibile utilizzare una casella di spunta Chiave per una colonna e un prefisso key_ per un'altra colonna.

Proprietà di configurazione client Kafka

Utilizza Proprietà di configurazione client (in Configurazione Kafka avanzata nella scheda Stage ) quando devi impostare una proprietà di configurazione client Kafka , ma il connettore Apache Kafka non dispone di una finestra di configurazione.

Immettere la proprietà con il formato key=value con voci definite su ciascuna riga. Utilizzare il carattere # per i commenti. Per una descrizione delle regole per le proprietà di configurazione del client, consultare la definizione Proprietà Java .

Risoluzione dei problemi

Controllo dell'output del log

Registrazione client Kafka

Utilizza le proprietà nella configurazione Kafka avanzata per controllare la registrazione del client Kafka . Le voci di log del client Kafka verranno scritte nel log del job DataStage insieme alle voci di log per l'output degli stage.

  • Livello di registrazione: imposta il livello minimo di messaggi prodotti dal client Kafka per scrivere nel log del lavoro.
  • Voci del log degli errori e delle avvertenze: modificare il valore in Registra come informativo o in Registra come avvertenza per controllare se eventuali messaggi ERROR o FATAL verranno scritti nel log del lavoro con una gravità che potrebbe causare un malfunzionamento del lavoro.

Per visualizzare i messaggi Debug o Trace , impostare il livello di registrazione associato nella variabile di ambiente CC_MSG_LEVEL .

Messaggi di errore

Tabella 2. Risoluzione degli errori
Tipo errore Dettagli dell'errore Soluzione
KafkaProducer - Impossibile aggiornare metadati dopo 20000 ms Il client Kafka non riporta il problema di connessione quando si verifica una configurazione non corretta tra l'autenticazione impostata sul lato broker Kafka e all'interno del connettore Apache Kafka . Ad esempio, il connettore Apache Kafka è configurato per la connessione senza alcuna impostazione di sicurezza, ma la connessione viene effettuata sulla porta protetta con SSL. Quando il connettore Apache Kafka viene configurato per leggere i messaggi da Kafka, il lavoro smetterà di rispondere al passo di connessione e il client Kafka non segnalerà un errore. È necessario arrestare il job manualmente. Assicurarsi che le impostazioni di connessione (nome host, porta, tipo di connessione sicura e proprietà) corrispondano all'impostazione della connessione del broker Kafka .
Impossibile completare il commit Quando il connettore Apache Kafka è configurato per l'esecuzione in modalità continua, il lavoro potrebbe avere esito negativo con questo messaggio:

Kafka Connector was unable to commit messages due to following error: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Nella sezione Avanzate dello stage, modificare il modo Esecuzione in Sequenziale.
Chiave null Il file keytab non è corretto o le credenziali del principal utente non esistono nel file keytab. Accertarsi che il file keytab sia corretto.
Eccezione del programma di analisi SAX generata: l'input è terminato prima della fine di tutte le tag avviate. L'ultimo tag avviato era 'AdvancedKafkaConfigOptions' Il valore di Proprietà di configurazione client può contenere più righe di voci chiave - valore. Poiché ogni proprietà può essere parametrizzata con il formato #name# , viene controllato anche se è presente un parametro. Allo stesso tempo, il valore delle proprietà del client Kafka deve essere conforme ai requisiti della classe Proprietà Java . Il carattere # può essere interpretato come commento o come inizio di un parametro del job. Quando il carattere # viene visualizzato all'interno dell'ultima riga di questa proprietà, il preprocessore presume che il carattere corrisponda a un secondo carattere # e non riesce se non viene trovato. In Proprietà di configurazione client, assicurarsi che sia presente almeno una nuova riga vuota alla fine di ciascuna proprietà.