package com.perforce.p4splunk; import java.util.ArrayList; import java.util.Arrays; import java.util.Map; import javax.xml.stream.XMLStreamException; import com.perforce.p4java.exception.AccessException; import com.perforce.p4java.exception.ConnectionException; import com.perforce.p4java.exception.RequestException; import com.perforce.p4java.server.IOptionsServer; import com.perforce.p4splunk.client.AuthorisationConfig; import com.perforce.p4splunk.client.ConnectionConfig; import com.perforce.p4splunk.client.ConnectionFactory; import com.splunk.modularinput.Event; import com.splunk.modularinput.EventWriter; import com.splunk.modularinput.MalformedDataException; import com.splunk.modularinput.Parameter; import com.splunk.modularinput.SingleValueParameter; public class JournalExport implements Runnable { private final EventWriter log; private final String name; private String journal; private String position; private String table; private long limit = 10000; private String jnlStartIndex = "0"; private ConnectionConfig connection; private AuthorisationConfig auth; public JournalExport(EventWriter log, String name) { this.log = log; this.name = name; this.position = "0"; } public JournalExport() throws XMLStreamException { this.log = new EventWriter(); this.name = "test"; this.position = "0"; } public void setConnection(ConnectionConfig connection) { this.connection = connection; } public void setAuthorisation(AuthorisationConfig auth) { this.auth = auth; } public void run() { String logID = "P4[" + name + "]: "; this.journal = jnlStartIndex; this.position = "0"; log.synchronizedLog(EventWriter.INFO, logID + "Open connection: " + connection.getPort()); while (true) { Map<String, Object>[] list; ArrayList<String> argList = new ArrayList<String>(); // Journal position argList.add("-j"); argList.add(journal + "/" + position); // Journal line limit argList.add("-l"); argList.add(String.valueOf(limit)); // Journal table filter if (table != null && !table.isEmpty()) { argList.add("-F"); argList.add("\"table=" + table + "\""); } IOptionsServer p4 = connect(); try { String[] args = Arrays.copyOf(argList.toArray(), argList.toArray().length, String[].class); list = p4.execMapCmd("export", args, null); } catch (ConnectionException e) { String err = logID + "Connection Error: " + e.getMessage(); log.synchronizedLog(EventWriter.ERROR, err); return; } catch (RequestException e1) { String err = logID + "Request Error: " + e1.getMessage(); log.synchronizedLog(EventWriter.ERROR, err); return; } catch (AccessException e1) { String err = logID + "Access Error: " + e1.getMessage(); log.synchronizedLog(EventWriter.ERROR, err); return; } finally { try { p4.disconnect(); } catch (Exception e) { e.printStackTrace(); return; } } // Iterate over the journal entries for (Map<String, Object> map : list) { Event event = new Event(); event.setStanza(name); String type = null; StringBuffer data = new StringBuffer(); // Map key->value pairs to event for (String key : map.keySet()) { String value = (String) map.get(key); // look for position marker if (key.contentEquals("pos")) { String[] parts = value.split("/"); if (parts.length == 1) { // Journal rotation this.journal = parts[0]; this.position = "0"; } else { this.journal = parts[0]; this.position = parts[1]; } } // look for entry type if (key.contentEquals("op")) { type = value; } // log data data.append(key + "=" + value + " "); } // Only write 'pv' (put value) journal entries if ("pv".equals(type)) { try { event.setData(data.toString()); log.synchronizedLog(EventWriter.INFO, event.getData()); log.writeEvent(event); } catch (MalformedDataException e) { String err = logID + "MalformedData: " + e.getMessage(); log.synchronizedLog(EventWriter.ERROR, err); } } } try { Thread.sleep(3000); } catch (InterruptedException e) { return; } } } /** * Convenience wrapper to connect and report errors */ private IOptionsServer connect() { IOptionsServer server; // Connect to the Perforce server try { server = ConnectionFactory.getConnection(connection); log.synchronizedLog(EventWriter.INFO, "P4: opened connection OK"); } catch (Exception e) { String err = "P4: Unable to connect: " + e; log.synchronizedLog(EventWriter.ERROR, err); return null; } // Login to Perforce try { login(server); } catch (Exception e) { String err = "P4: Unable to login: " + e; log.synchronizedLog(EventWriter.ERROR, err); return null; } return server; } private boolean login(IOptionsServer p4) throws Exception { p4.setUserName(auth.getUsername()); // CHARSET is not defined (only for client access) if (p4.getServerInfo().isUnicodeEnabled()) { p4.setCharsetName("utf8"); } switch (auth.getType()) { case PASSWORD: String status = p4.getLoginStatus(); if (!status.contains("not necessary")) { String pass = auth.getPassword(); p4.login(pass); } break; case TICKET: String ticket = auth.getTicketValue(); p4.setAuthTicket(ticket); break; case TICKETPATH: String path = auth.getTicketPath(); p4.setTicketsFilePath(path); break; default: throw new Exception("Unknown Authorisation type: " + auth.getType()); } return isLogin(p4); } private boolean isLogin(IOptionsServer p4) throws Exception { String status = p4.getLoginStatus(); if (status.contains("not necessary")) { return true; } if (status.contains("ticket expires in")) { return true; } // If there is a broker or something else that swallows the message if (status.isEmpty()) { return true; } String err = "P4: login failed '" + status + "'"; log.synchronizedLog(EventWriter.ERROR, err); return false; } public void setConfiguration(Map<String, Parameter> params) { if (params.containsKey("table")) { this.table = ((SingleValueParameter) params.get("table")) .getValue(); } if (params.containsKey("limit")) { this.limit = ((SingleValueParameter) params.get("limit")).getLong(); } if (params.containsKey("jnlStartIndex")) { this.jnlStartIndex = ((SingleValueParameter) params .get("jnlStartIndex")).getValue(); } } }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#3 | 10090 | Paul Allen |
Submit journal @pv@ entry as "key=value key=value etc..." + Basic password support + Journal limit and start index + Basic Journal db table filter + Open/Close the Perforce connection each iteration |
||
#2 | 10083 | Paul Allen | Added support for journal position and rotation. | ||
#1 | 10077 | Paul Allen | Add export command -- not yet working. |