Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class SingerMetrics {

public static final String LOGSTREAM_CREATION_FAILURE = "singer.logstream.creation_failure";

public static final String LOGSTREAM_SORT_EXCEPTION = "singer.logstream.sort_exception";

public static final String LOGSTREAM_MISSING_INODE_PATH = "singer.logstream.missing_inode_path";

public static final String LOGSTREAM_INITIALIZE = "singer.logstream.initialize";
Expand Down Expand Up @@ -123,6 +125,8 @@ public class SingerMetrics {
public static final String LEADER_INFO_EXCEPTION = SINGER_WRITER + "leader_info_exception";
public static final String MISSING_LOCAL_PARTITIONS = "singer.locality.missing_local_partitions";
public static final String MISSING_DIR_CHECKER_INTERRUPTED = "singer.missing_dir_checker.thread_interrupted";
public static final String MISSING_DIR_CHECKER_EXCEPTION = "singer.missing_dir_checker.iteration_exception";
public static final String MISSING_DIR_CHECK_THREAD_STOPPED = "singer.missing_dir_check_thread_stopped";
public static final String NUMBER_OF_MISSING_DIRS = "singer.missing_dir_checker.num_of_missing_dirs";
public static final String NUMBER_OF_SERIALIZING_HEADERS_ERRORS = "singer.headers_injector.num_of_serializing_headers_errors";
public static final String AUDIT_HEADERS_INJECTED = "singer.audit.num_of_headers_injected";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.commons.io.comparator.CompositeFileComparator;
import org.apache.commons.io.comparator.LastModifiedFileComparator;
import org.apache.commons.io.comparator.NameFileComparator;
import org.apache.commons.io.filefilter.RegexFileFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -414,11 +413,16 @@ private void createPrefixBasedLogStreams(SingerLog singerLog,

// Sort the file first by last_modified timestamp and then by name in case two files have
// the same mtime due to precision (mtime is up to seconds).
@SuppressWarnings("rawtypes")
Ordering ordering = Ordering.from(
new CompositeFileComparator(
LastModifiedFileComparator.LASTMODIFIED_COMPARATOR, NameFileComparator.NAME_REVERSE));
logFiles = ordering.sortedCopy(logFiles);
try {
@SuppressWarnings("rawtypes")
Ordering ordering = Ordering.from(
new CompositeFileComparator(
LastModifiedFileComparator.LASTMODIFIED_COMPARATOR, NameFileComparator.NAME_REVERSE));
logFiles = ordering.sortedCopy(logFiles);
} catch (Exception e) {
Stats.incr(SingerMetrics.LOGSTREAM_SORT_EXCEPTION);
throw e;
}

for (File file : logFiles) {
String fileName = file.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ public void setSingerLogsWithoutDir(Map<SingerLog, String> singerLogsWithoutDir)
@Override
public void run() {
LOG.info("[{}] is checking missing directories if any...", Thread.currentThread().getName());
try {
while (!cancelled.get() && singerLogsWithoutDir != null && !singerLogsWithoutDir.isEmpty()) {
while (!cancelled.get() && singerLogsWithoutDir != null && !singerLogsWithoutDir.isEmpty()) {
try {
LOG.info("[{}] Checking missing directories for {} SingerLogs.",
Thread.currentThread().getName(), singerLogsWithoutDir.size());
Iterator<Map.Entry<SingerLog, String>> iterator = singerLogsWithoutDir.entrySet().iterator();
Iterator<Map.Entry<SingerLog, String>> iterator = singerLogsWithoutDir.entrySet()
.iterator();
while (iterator.hasNext()) {
Map.Entry<SingerLog, String> entry = iterator.next();
SingerLog singerLog = entry.getKey();
Expand Down Expand Up @@ -129,29 +130,40 @@ public void run() {
LOG.info("[{}] sleep for {} milliseconds and then check again.",
Thread.currentThread().getName(), sleepInMills);
Thread.sleep(sleepInMills);
} catch (InterruptedException e) {
Stats.incr(SingerMetrics.MISSING_DIR_CHECKER_INTERRUPTED);
LOG.warn("MissingDirChecker thread is interrupted ", e);
break;
} catch (Exception e) {
Stats.incr(SingerMetrics.MISSING_DIR_CHECKER_EXCEPTION);
LOG.error("Unexpected exception in MissingDirChecker iteration. " +
"Will try again next iteration.", e);
try {
Thread.sleep(sleepInMills);
} catch (InterruptedException ie) {
Stats.incr(SingerMetrics.MISSING_DIR_CHECKER_INTERRUPTED);
LOG.warn("MissingDirChecker interrupted during error recovery sleep", ie);
break;
}
}
} catch (InterruptedException e){
Stats.incr(SingerMetrics.MISSING_DIR_CHECKER_INTERRUPTED);
LOG.warn("MissingDirChecker thread is interrupted ", e);
} catch (Exception e){
LOG.error("MissingDirChecker thread needs to stop due to ", e);
} finally {
LOG.info("MissingDirChecker thread stopped. SingerLogs without dir: " + singerLogsWithoutDir);
}
LOG.warn("MissingDirChecker thread stopped. SingerLogs without dir: " +
(singerLogsWithoutDir != null ? singerLogsWithoutDir.size() : 0));
Stats.incr(SingerMetrics.MISSING_DIR_CHECK_THREAD_STOPPED);
}

public synchronized void start() {
if(this.thread == null) {
if (this.thread == null) {
thread = new Thread(this);
thread.setDaemon(true);
thread.setName("MissingDirChecker");
thread.start();
}
}

public synchronized void stop(){
public synchronized void stop() {
cancelled.set(true);
if(this.thread != null && thread.isAlive()) {
if (this.thread != null && thread.isAlive()) {
thread.interrupt();
}
}
Expand Down