diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java index 62ed93d5..5630ea33 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java @@ -45,6 +45,8 @@ public class SingerMetrics { public static final String LOGSTREAM_CREATION_FAILURE = "singer.logstream.creation_failure"; + public static final String LOGSTREAM_SORT_EXCEPTION = "singer.logstream.sort_exception"; + public static final String LOGSTREAM_MISSING_INODE_PATH = "singer.logstream.missing_inode_path"; public static final String LOGSTREAM_INITIALIZE = "singer.logstream.initialize"; @@ -123,6 +125,8 @@ public class SingerMetrics { public static final String LEADER_INFO_EXCEPTION = SINGER_WRITER + "leader_info_exception"; public static final String MISSING_LOCAL_PARTITIONS = "singer.locality.missing_local_partitions"; public static final String MISSING_DIR_CHECKER_INTERRUPTED = "singer.missing_dir_checker.thread_interrupted"; + public static final String MISSING_DIR_CHECKER_EXCEPTION = "singer.missing_dir_checker.iteration_exception"; + public static final String MISSING_DIR_CHECK_THREAD_STOPPED = "singer.missing_dir_check_thread_stopped"; public static final String NUMBER_OF_MISSING_DIRS = "singer.missing_dir_checker.num_of_missing_dirs"; public static final String NUMBER_OF_SERIALIZING_HEADERS_ERRORS = "singer.headers_injector.num_of_serializing_headers_errors"; public static final String AUDIT_HEADERS_INJECTED = "singer.audit.num_of_headers_injected"; diff --git a/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java b/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java index 9b7b1de1..65059dea 100644 --- a/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java +++ b/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java @@ -45,7 +45,6 @@ import org.apache.commons.io.comparator.CompositeFileComparator; import org.apache.commons.io.comparator.LastModifiedFileComparator; import org.apache.commons.io.comparator.NameFileComparator; -import org.apache.commons.io.filefilter.RegexFileFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -414,11 +413,16 @@ private void createPrefixBasedLogStreams(SingerLog singerLog, // Sort the file first by last_modified timestamp and then by name in case two files have // the same mtime due to precision (mtime is up to seconds). - @SuppressWarnings("rawtypes") - Ordering ordering = Ordering.from( - new CompositeFileComparator( - LastModifiedFileComparator.LASTMODIFIED_COMPARATOR, NameFileComparator.NAME_REVERSE)); - logFiles = ordering.sortedCopy(logFiles); + try { + @SuppressWarnings("rawtypes") + Ordering ordering = Ordering.from( + new CompositeFileComparator( + LastModifiedFileComparator.LASTMODIFIED_COMPARATOR, NameFileComparator.NAME_REVERSE)); + logFiles = ordering.sortedCopy(logFiles); + } catch (Exception e) { + Stats.incr(SingerMetrics.LOGSTREAM_SORT_EXCEPTION); + throw e; + } for (File file : logFiles) { String fileName = file.getName(); diff --git a/singer/src/main/java/com/pinterest/singer/monitor/MissingDirChecker.java b/singer/src/main/java/com/pinterest/singer/monitor/MissingDirChecker.java index eacee6dd..bd76c21a 100644 --- a/singer/src/main/java/com/pinterest/singer/monitor/MissingDirChecker.java +++ b/singer/src/main/java/com/pinterest/singer/monitor/MissingDirChecker.java @@ -81,11 +81,12 @@ public void setSingerLogsWithoutDir(Map singerLogsWithoutDir) @Override public void run() { LOG.info("[{}] is checking missing directories if any...", Thread.currentThread().getName()); - try { - while (!cancelled.get() && singerLogsWithoutDir != null && !singerLogsWithoutDir.isEmpty()) { + while (!cancelled.get() && singerLogsWithoutDir != null && !singerLogsWithoutDir.isEmpty()) { + try { LOG.info("[{}] Checking missing directories for {} SingerLogs.", Thread.currentThread().getName(), singerLogsWithoutDir.size()); - Iterator> iterator = singerLogsWithoutDir.entrySet().iterator(); + Iterator> iterator = singerLogsWithoutDir.entrySet() + .iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); SingerLog singerLog = entry.getKey(); @@ -129,19 +130,30 @@ public void run() { LOG.info("[{}] sleep for {} milliseconds and then check again.", Thread.currentThread().getName(), sleepInMills); Thread.sleep(sleepInMills); + } catch (InterruptedException e) { + Stats.incr(SingerMetrics.MISSING_DIR_CHECKER_INTERRUPTED); + LOG.warn("MissingDirChecker thread is interrupted ", e); + break; + } catch (Exception e) { + Stats.incr(SingerMetrics.MISSING_DIR_CHECKER_EXCEPTION); + LOG.error("Unexpected exception in MissingDirChecker iteration. " + + "Will try again next iteration.", e); + try { + Thread.sleep(sleepInMills); + } catch (InterruptedException ie) { + Stats.incr(SingerMetrics.MISSING_DIR_CHECKER_INTERRUPTED); + LOG.warn("MissingDirChecker interrupted during error recovery sleep", ie); + break; + } } - } catch (InterruptedException e){ - Stats.incr(SingerMetrics.MISSING_DIR_CHECKER_INTERRUPTED); - LOG.warn("MissingDirChecker thread is interrupted ", e); - } catch (Exception e){ - LOG.error("MissingDirChecker thread needs to stop due to ", e); - } finally { - LOG.info("MissingDirChecker thread stopped. SingerLogs without dir: " + singerLogsWithoutDir); } + LOG.warn("MissingDirChecker thread stopped. SingerLogs without dir: " + + (singerLogsWithoutDir != null ? singerLogsWithoutDir.size() : 0)); + Stats.incr(SingerMetrics.MISSING_DIR_CHECK_THREAD_STOPPED); } public synchronized void start() { - if(this.thread == null) { + if (this.thread == null) { thread = new Thread(this); thread.setDaemon(true); thread.setName("MissingDirChecker"); @@ -149,9 +161,9 @@ public synchronized void start() { } } - public synchronized void stop(){ + public synchronized void stop() { cancelled.set(true); - if(this.thread != null && thread.isAlive()) { + if (this.thread != null && thread.isAlive()) { thread.interrupt(); } }