1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 import { unionize, ofType, UnionOf } from "common/unionize";
6 import { ProcessLogs } from './process-logs-panel';
7 import { LogEventType } from 'models/log';
8 import { RootState } from 'store/store';
9 import { ServiceRepository } from 'services/services';
10 import { Dispatch } from 'redux';
11 import { LogFragment, LogService, logFileToLogType } from 'services/log-service/log-service';
12 import { Process, getProcess } from 'store/processes/process';
13 import { navigateTo } from 'store/navigation/navigation-action';
14 import { snackbarActions, SnackbarKind } from 'store/snackbar/snackbar-actions';
15 import { CollectionFile, CollectionFileType } from "models/collection-file";
16 import { ContainerRequestResource } from "models/container-request";
18 const SNIPLINE = `================ ✀ ================ ✀ ========= Some log(s) were skipped ========= ✀ ================ ✀ ================`;
19 const LOG_TIMESTAMP_PATTERN = /^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{9}Z/;
21 export const processLogsPanelActions = unionize({
22 RESET_PROCESS_LOGS_PANEL: ofType<{}>(),
23 INIT_PROCESS_LOGS_PANEL: ofType<{ filters: string[], logs: ProcessLogs }>(),
24 SET_PROCESS_LOGS_PANEL_FILTER: ofType<string>(),
25 ADD_PROCESS_LOGS_PANEL_ITEM: ofType<ProcessLogs>(),
28 // Max size of logs to fetch in bytes
29 const maxLogFetchSize: number = 128 * 1000;
31 type FileWithProgress = {
36 export type ProcessLogsPanelAction = UnionOf<typeof processLogsPanelActions>;
38 export const setProcessLogsPanelFilter = (filter: string) =>
39 processLogsPanelActions.SET_PROCESS_LOGS_PANEL_FILTER(filter);
41 export const initProcessLogsPanel = (processUuid: string) =>
42 async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
44 dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL());
45 const process = getProcess(processUuid)(getState().resources);
46 if (process?.containerRequest?.uuid) {
47 // Get log file size info
48 const logFiles = await loadContainerLogFileList(process.containerRequest, logService);
50 // Populate lastbyte 0 for each file
51 const filesWithProgress = logFiles.map((file) => ({file, lastByte: 0}));
53 // Fetch array of LogFragments
54 const logLines = await loadContainerLogFileContents(filesWithProgress, logService, process);
56 // Populate initial state with filters
57 const initialState = createInitialLogPanelState(logFiles, logLines);
58 dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
61 // On error, populate empty state to allow polling to start
62 const initialState = createInitialLogPanelState([], []);
63 dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
64 // Only show toast on errors other than 404 since 404 is expected when logs do not exist yet
65 if (e.status !== 404) {
66 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Could not load process logs', hideDuration: 2000, kind: SnackbarKind.ERROR }));
71 export const pollProcessLogs = (processUuid: string) =>
72 async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
74 // Get log panel state and process from store
75 const currentState = getState().processLogsPanel;
76 const process = getProcess(processUuid)(getState().resources);
78 // Check if container request is present and initial logs state loaded
79 if (process?.containerRequest?.uuid && Object.keys(currentState.logs).length > 0) {
80 const logFiles = await loadContainerLogFileList(process.containerRequest, logService);
82 // Determine byte to fetch from while filtering unchanged files
83 const filesToUpdateWithProgress = logFiles.reduce((acc, updatedFile) => {
84 // Fetch last byte or 0 for new log files
85 const currentStateLogLastByte = currentState.logs[logFileToLogType(updatedFile)]?.lastByte || 0;
87 const isNew = !Object.keys(currentState.logs).find((currentStateLogName) => (updatedFile.name.startsWith(currentStateLogName)));
88 const isChanged = !isNew && currentStateLogLastByte < updatedFile.size;
90 if (isNew || isChanged) {
91 return acc.concat({file: updatedFile, lastByte: currentStateLogLastByte});
95 }, [] as FileWithProgress[]);
97 // Perform range request(s) for each file
98 const logFragments = await loadContainerLogFileContents(filesToUpdateWithProgress, logService, process);
100 if (logFragments.length) {
101 // Convert LogFragments to ProcessLogs with All/Main sorting & line-merging
102 const groupedLogs = groupLogs(logFiles, logFragments);
103 await dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM(groupedLogs));
106 return Promise.resolve();
108 // Remove log when polling error is handled in some way instead of being ignored
109 console.error("Error occurred in pollProcessLogs:", e);
110 return Promise.reject();
114 const loadContainerLogFileList = async (containerRequest: ContainerRequestResource, logService: LogService) => {
115 const logCollectionContents = await logService.listLogFiles(containerRequest);
117 // Filter only root directory files matching log event types which have bytes
118 return logCollectionContents.filter((file): file is CollectionFile => (
119 file.type === CollectionFileType.FILE &&
120 PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(logFileToLogType(file)) > -1 &&
126 * Loads the contents of each file from each file's lastByte simultaneously
127 * while respecting the maxLogFetchSize by requesting the start and end
128 * of the desired block and inserting a snipline.
129 * @param logFilesWithProgress CollectionFiles with the last byte previously loaded
132 * @returns LogFragment[] containing a single LogFragment corresponding to each input file
134 const loadContainerLogFileContents = async (logFilesWithProgress: FileWithProgress[], logService: LogService, process: Process) => (
135 (await Promise.allSettled(logFilesWithProgress.filter(({file}) => file.size > 0).map(({file, lastByte}) => {
136 const requestSize = file.size - lastByte;
137 if (requestSize > maxLogFetchSize) {
138 const chunkSize = Math.floor(maxLogFetchSize / 2);
139 const firstChunkEnd = lastByte+chunkSize-1;
141 logService.getLogFileContents(process.containerRequest, file, lastByte, firstChunkEnd),
142 logService.getLogFileContents(process.containerRequest, file, file.size-chunkSize, file.size-1)
143 ] as Promise<(LogFragment)>[]);
145 return Promise.all([logService.getLogFileContents(process.containerRequest, file, lastByte, file.size-1)]);
148 if (res.length && res.every(promiseResult => (promiseResult.status === 'rejected'))) {
149 // Since allSettled does not pass promise rejection we throw an
150 // error if every request failed
151 const error = res.find(
152 (promiseResult): promiseResult is PromiseRejectedResult => promiseResult.status === 'rejected'
154 return Promise.reject(error);
156 return res.filter((promiseResult): promiseResult is PromiseFulfilledResult<LogFragment[]> => (
157 // Filter out log files with rejected promises
158 // (Promise.all rejects on any failure)
159 promiseResult.status === 'fulfilled' &&
160 // Filter out files where any fragment is empty
161 // (prevent incorrect snipline generation or an un-resumable situation)
162 !!promiseResult.value.every(logFragment => logFragment.contents.length)
163 )).map(one => one.value)
164 })).map((logResponseSet)=> {
165 // For any multi fragment response set, modify the last line of non-final chunks to include a line break and snip line
166 // Don't add snip line as a separate line so that sorting won't reorder it
167 for (let i = 1; i < logResponseSet.length; i++) {
168 const fragment = logResponseSet[i-1];
169 const lastLineIndex = fragment.contents.length-1;
170 const lastLineContents = fragment.contents[lastLineIndex];
171 const newLastLine = `${lastLineContents}\n${SNIPLINE}`;
173 logResponseSet[i-1].contents[lastLineIndex] = newLastLine;
176 // Merge LogFragment Array (representing multiple log line arrays) into single LogLine[] / LogFragment
177 return logResponseSet.reduce((acc, curr: LogFragment) => ({
178 logType: curr.logType,
179 contents: [...(acc.contents || []), ...curr.contents]
180 }), {} as LogFragment);
184 const createInitialLogPanelState = (logFiles: CollectionFile[], logFragments: LogFragment[]): {filters: string[], logs: ProcessLogs} => {
185 const logs = groupLogs(logFiles, logFragments);
186 const filters = Object.keys(logs);
187 return { filters, logs };
191 * Converts LogFragments into ProcessLogs, grouping and sorting All/Main logs
193 * @param logFragments
194 * @returns ProcessLogs for the store
196 const groupLogs = (logFiles: CollectionFile[], logFragments: LogFragment[]): ProcessLogs => {
197 const sortableLogFragments = mergeMultilineLoglines(logFragments);
199 const allLogs = mergeSortLogFragments(sortableLogFragments);
200 const mainLogs = mergeSortLogFragments(sortableLogFragments.filter((fragment) => (MAIN_EVENT_TYPES.includes(fragment.logType))));
202 const groupedLogs = logFragments.reduce((grouped, fragment) => ({
204 [fragment.logType as string]: {lastByte: fetchLastByteNumber(logFiles, fragment.logType), contents: fragment.contents}
208 [MAIN_FILTER_TYPE]: {lastByte: undefined, contents: mainLogs},
209 [ALL_FILTER_TYPE]: {lastByte: undefined, contents: allLogs},
215 * Checks for non-timestamped log lines and merges them with the previous line, assumes they are multi-line logs
216 * If there is no previous line (first line has no timestamp), the line is deleted.
217 * Only used for combined logs that need sorting by timestamp after merging
218 * @param logFragments
219 * @returns Modified LogFragment[]
221 const mergeMultilineLoglines = (logFragments: LogFragment[]) => (
222 logFragments.map((fragment) => {
223 // Avoid altering the original fragment copy
224 let fragmentCopy: LogFragment = {
225 logType: fragment.logType,
226 contents: [...fragment.contents],
228 // Merge any non-timestamped lines in sortable log types with previous line
229 if (fragmentCopy.contents.length && !NON_SORTED_LOG_TYPES.includes(fragmentCopy.logType)) {
230 for (let i = 0; i < fragmentCopy.contents.length; i++) {
231 const lineContents = fragmentCopy.contents[i];
232 if (!lineContents.match(LOG_TIMESTAMP_PATTERN)) {
233 // Partial line without timestamp detected
235 // If not first line, copy line to previous line
236 const previousLineContents = fragmentCopy.contents[i-1];
237 const newPreviousLineContents = `${previousLineContents}\n${lineContents}`;
238 fragmentCopy.contents[i-1] = newPreviousLineContents;
240 // Delete the current line and prevent iterating
241 fragmentCopy.contents.splice(i, 1);
251 * Merges log lines of different types and sorts types that contain timestamps (are sortable)
252 * @param logFragments
253 * @returns string[] of merged and sorted log lines
255 const mergeSortLogFragments = (logFragments: LogFragment[]): string[] => {
256 const sortableLines = fragmentsToLines(logFragments
257 .filter((fragment) => (!NON_SORTED_LOG_TYPES.includes(fragment.logType))));
259 const nonSortableLines = fragmentsToLines(logFragments
260 .filter((fragment) => (NON_SORTED_LOG_TYPES.includes(fragment.logType)))
261 .sort((a, b) => (a.logType.localeCompare(b.logType))));
263 return [...nonSortableLines, ...sortableLines.sort(sortLogLines)]
266 const sortLogLines = (a: string, b: string) => {
267 return a.localeCompare(b);
270 const fragmentsToLines = (fragments: LogFragment[]): string[] => (
271 fragments.reduce((acc, fragment: LogFragment) => (
272 acc.concat(...fragment.contents)
276 const fetchLastByteNumber = (logFiles: CollectionFile[], key: string) => {
277 return logFiles.find((file) => (file.name.startsWith(key)))?.size
280 export const navigateToLogCollection = (uuid: string) =>
281 async (dispatch: Dispatch<any>, getState: () => RootState, services: ServiceRepository) => {
283 await services.collectionService.get(uuid);
284 dispatch<any>(navigateTo(uuid));
286 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Could not request collection', hideDuration: 2000, kind: SnackbarKind.ERROR }));
290 const ALL_FILTER_TYPE = 'All logs';
292 const MAIN_FILTER_TYPE = 'Main logs';
293 const MAIN_EVENT_TYPES = [
294 LogEventType.CRUNCH_RUN,
299 const PROCESS_PANEL_LOG_EVENT_TYPES = [
300 LogEventType.ARV_MOUNT,
301 LogEventType.CRUNCH_RUN,
302 LogEventType.CRUNCHSTAT,
303 LogEventType.DISPATCH,
304 LogEventType.HOSTSTAT,
305 LogEventType.NODE_INFO,
308 LogEventType.CONTAINER,
309 LogEventType.KEEPSTORE,
312 const NON_SORTED_LOG_TYPES = [
313 LogEventType.NODE_INFO,
314 LogEventType.CONTAINER,