1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 import { unionize, ofType, UnionOf } from "common/unionize";
6 import { ProcessLogs } from './process-logs-panel';
7 import { LogEventType } from 'models/log';
8 import { RootState } from 'store/store';
9 import { ServiceRepository } from 'services/services';
10 import { Dispatch } from 'redux';
11 import { LogFragment, LogService, logFileToLogType } from 'services/log-service/log-service';
12 import { Process, getProcess } from 'store/processes/process';
13 import { navigateTo } from 'store/navigation/navigation-action';
14 import { snackbarActions, SnackbarKind } from 'store/snackbar/snackbar-actions';
15 import { CollectionFile, CollectionFileType } from "models/collection-file";
17 const SNIPLINE = `================ ✀ ================ ✀ ========= Some log(s) were skipped ========= ✀ ================ ✀ ================`;
18 const LOG_TIMESTAMP_PATTERN = /^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{9}Z/;
20 export const processLogsPanelActions = unionize({
21 RESET_PROCESS_LOGS_PANEL: ofType<{}>(),
22 INIT_PROCESS_LOGS_PANEL: ofType<{ filters: string[], logs: ProcessLogs }>(),
23 SET_PROCESS_LOGS_PANEL_FILTER: ofType<string>(),
24 ADD_PROCESS_LOGS_PANEL_ITEM: ofType<ProcessLogs>(),
27 // Max size of logs to fetch in bytes
28 const maxLogFetchSize: number = 128 * 1000;
30 type FileWithProgress = {
35 export type ProcessLogsPanelAction = UnionOf<typeof processLogsPanelActions>;
37 export const setProcessLogsPanelFilter = (filter: string) =>
38 processLogsPanelActions.SET_PROCESS_LOGS_PANEL_FILTER(filter);
40 export const initProcessLogsPanel = (processUuid: string) =>
41 async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
42 dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL());
43 const process = getProcess(processUuid)(getState().resources);
44 if (process?.containerRequest?.uuid) {
45 // Get log file size info
46 const logFiles = await loadContainerLogFileList(process.containerRequest.uuid, logService);
48 // Populate lastbyte 0 for each file
49 const filesWithProgress = logFiles.map((file) => ({file, lastByte: 0}));
51 // Fetch array of LogFragments
52 const logLines = await loadContainerLogFileContents(filesWithProgress, logService, process);
54 // Populate initial state with filters
55 const initialState = createInitialLogPanelState(logFiles, logLines);
56 dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
60 export const pollProcessLogs = (processUuid: string) =>
61 async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
63 // Get log panel state and process from store
64 const currentState = getState().processLogsPanel;
65 const process = getProcess(processUuid)(getState().resources);
67 // Check if container request is present and initial logs state loaded
68 if (process?.containerRequest?.uuid && Object.keys(currentState.logs).length > 0) {
69 const logFiles = await loadContainerLogFileList(process.containerRequest.uuid, logService);
71 // Determine byte to fetch from while filtering unchanged files
72 const filesToUpdateWithProgress = logFiles.reduce((acc, updatedFile) => {
73 // Fetch last byte or 0 for new log files
74 const currentStateLogLastByte = currentState.logs[logFileToLogType(updatedFile)]?.lastByte || 0;
76 const isNew = !Object.keys(currentState.logs).find((currentStateLogName) => (updatedFile.name.startsWith(currentStateLogName)));
77 const isChanged = !isNew && currentStateLogLastByte < updatedFile.size;
79 if (isNew || isChanged) {
80 return acc.concat({file: updatedFile, lastByte: currentStateLogLastByte});
84 }, [] as FileWithProgress[]);
86 // Perform range request(s) for each file
87 const logFragments = await loadContainerLogFileContents(filesToUpdateWithProgress, logService, process);
89 if (logFragments.length) {
90 // Convert LogFragments to ProcessLogs with All/Main sorting & line-merging
91 const groupedLogs = groupLogs(logFiles, logFragments);
92 await dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM(groupedLogs));
95 return Promise.resolve();
97 return Promise.reject();
101 const loadContainerLogFileList = async (containerUuid: string, logService: LogService) => {
102 const logCollectionContents = await logService.listLogFiles(containerUuid);
104 // Filter only root directory files matching log event types which have bytes
105 return logCollectionContents.filter((file): file is CollectionFile => (
106 file.type === CollectionFileType.FILE &&
107 PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(logFileToLogType(file)) > -1 &&
113 * Loads the contents of each file from each file's lastByte simultaneously
114 * while respecting the maxLogFetchSize by requesting the start and end
115 * of the desired block and inserting a snipline.
116 * @param logFilesWithProgress CollectionFiles with the last byte previously loaded
119 * @returns LogFragment[] containing a single LogFragment corresponding to each input file
121 const loadContainerLogFileContents = async (logFilesWithProgress: FileWithProgress[], logService: LogService, process: Process) => (
122 (await Promise.all(logFilesWithProgress.filter(({file}) => file.size > 0).map(({file, lastByte}) => {
123 const requestSize = file.size - lastByte;
124 if (requestSize > maxLogFetchSize) {
125 const chunkSize = Math.floor(maxLogFetchSize / 2);
126 const firstChunkEnd = lastByte+chunkSize-1;
128 logService.getLogFileContents(process.containerRequest.uuid, file, lastByte, firstChunkEnd),
129 logService.getLogFileContents(process.containerRequest.uuid, file, file.size-chunkSize, file.size-1)
130 ] as Promise<(LogFragment | undefined)>[]);
132 return Promise.all([logService.getLogFileContents(process.containerRequest.uuid, file, lastByte, file.size-1)]);
134 }))).filter((logResponseSet) => ( // Top level filter ensures array of LogFragments is not empty and contains 1 or more fragments containing log lines
135 logResponseSet.length && logResponseSet.some(logFragment => logFragment && logFragment.contents.length)
136 )).map((logResponseSet)=> {
137 // Remove fragments from subarrays that are undefined or have no lines
138 const responseSet = logResponseSet.filter((logFragment): logFragment is LogFragment => (
139 !!logFragment && logFragment.contents.length > 0
142 // For any multi fragment response set, modify the last line of non-final chunks to include a line break and snip line
143 // Don't add snip line as a separate line so that sorting won't reorder it
144 for (let i = 1; i < responseSet.length; i++) {
145 const fragment = responseSet[i-1];
146 const lastLineIndex = fragment.contents.length-1;
147 const lastLineContents = fragment.contents[lastLineIndex];
148 const newLastLine = `${lastLineContents}\n${SNIPLINE}`;
150 responseSet[i-1].contents[lastLineIndex] = newLastLine;
153 // Merge LogFragment Array (representing multiple log line arrays) into single LogLine[] / LogFragment
154 return responseSet.reduce((acc, curr: LogFragment) => ({
155 logType: curr.logType,
156 contents: [...(acc.contents || []), ...curr.contents]
157 }), {} as LogFragment);
161 const createInitialLogPanelState = (logFiles: CollectionFile[], logFragments: LogFragment[]): {filters: string[], logs: ProcessLogs} => {
162 const logs = groupLogs(logFiles, logFragments);
163 const filters = Object.keys(logs);
164 return { filters, logs };
168 * Converts LogFragments into ProcessLogs, grouping and sorting All/Main logs
170 * @param logFragments
171 * @returns ProcessLogs for the store
173 const groupLogs = (logFiles: CollectionFile[], logFragments: LogFragment[]): ProcessLogs => {
174 const sortableLogFragments = mergeMultilineLoglines(logFragments);
176 const allLogs = mergeSortLogFragments(sortableLogFragments);
177 const mainLogs = mergeSortLogFragments(sortableLogFragments.filter((fragment) => (MAIN_EVENT_TYPES.includes(fragment.logType))));
179 const groupedLogs = logFragments.reduce((grouped, fragment) => ({
181 [fragment.logType as string]: {lastByte: fetchLastByteNumber(logFiles, fragment.logType), contents: fragment.contents}
185 [MAIN_FILTER_TYPE]: {lastByte: undefined, contents: mainLogs},
186 [ALL_FILTER_TYPE]: {lastByte: undefined, contents: allLogs},
192 * Checks for non-timestamped log lines and merges them with the previous line, assumes they are multi-line logs
193 * If there is no previous line (first line has no timestamp), the line is deleted.
194 * Only used for combined logs that need sorting by timestamp after merging
195 * @param logFragments
196 * @returns Modified LogFragment[]
198 const mergeMultilineLoglines = (logFragments: LogFragment[]) => (
199 logFragments.map((fragment) => {
200 // Avoid altering the original fragment copy
201 let fragmentCopy: LogFragment = {
202 logType: fragment.logType,
203 contents: [...fragment.contents],
205 // Merge any non-timestamped lines in sortable log types with previous line
206 if (fragmentCopy.contents.length && !NON_SORTED_LOG_TYPES.includes(fragmentCopy.logType)) {
207 for (let i = 0; i < fragmentCopy.contents.length; i++) {
208 const lineContents = fragmentCopy.contents[i];
209 if (!lineContents.match(LOG_TIMESTAMP_PATTERN)) {
210 // Partial line without timestamp detected
212 // If not first line, copy line to previous line
213 const previousLineContents = fragmentCopy.contents[i-1];
214 const newPreviousLineContents = `${previousLineContents}\n${lineContents}`;
215 fragmentCopy.contents[i-1] = newPreviousLineContents;
217 // Delete the current line and prevent iterating
218 fragmentCopy.contents.splice(i, 1);
228 * Merges log lines of different types and sorts types that contain timestamps (are sortable)
229 * @param logFragments
230 * @returns string[] of merged and sorted log lines
232 const mergeSortLogFragments = (logFragments: LogFragment[]): string[] => {
233 const sortableLines = fragmentsToLines(logFragments
234 .filter((fragment) => (!NON_SORTED_LOG_TYPES.includes(fragment.logType))));
236 const nonSortableLines = fragmentsToLines(logFragments
237 .filter((fragment) => (NON_SORTED_LOG_TYPES.includes(fragment.logType)))
238 .sort((a, b) => (a.logType.localeCompare(b.logType))));
240 return [...sortableLines.sort(sortLogLines), ...nonSortableLines]
243 const sortLogLines = (a: string, b: string) => {
244 return a.localeCompare(b);
247 const fragmentsToLines = (fragments: LogFragment[]): string[] => (
248 fragments.reduce((acc, fragment: LogFragment) => (
249 acc.concat(...fragment.contents)
253 const fetchLastByteNumber = (logFiles: CollectionFile[], key: string) => {
254 return logFiles.find((file) => (file.name.startsWith(key)))?.size
257 export const navigateToLogCollection = (uuid: string) =>
258 async (dispatch: Dispatch<any>, getState: () => RootState, services: ServiceRepository) => {
260 await services.collectionService.get(uuid);
261 dispatch<any>(navigateTo(uuid));
263 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Could not request collection', hideDuration: 2000, kind: SnackbarKind.ERROR }));
267 const ALL_FILTER_TYPE = 'All logs';
269 const MAIN_FILTER_TYPE = 'Main logs';
270 const MAIN_EVENT_TYPES = [
271 LogEventType.CRUNCH_RUN,
276 const PROCESS_PANEL_LOG_EVENT_TYPES = [
277 LogEventType.ARV_MOUNT,
278 LogEventType.CRUNCH_RUN,
279 LogEventType.CRUNCHSTAT,
280 LogEventType.DISPATCH,
281 LogEventType.HOSTSTAT,
282 LogEventType.NODE_INFO,
285 LogEventType.CONTAINER,
286 LogEventType.KEEPSTORE,
289 const NON_SORTED_LOG_TYPES = [
290 LogEventType.NODE_INFO,
291 LogEventType.CONTAINER,