1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 import { unionize, ofType, UnionOf } from "common/unionize";
6 import { ProcessLogs } from './process-logs-panel';
7 import { LogEventType } from 'models/log';
8 import { RootState } from 'store/store';
9 import { ServiceRepository } from 'services/services';
10 import { Dispatch } from 'redux';
11 import { LogFragment, LogService, logFileToLogType } from 'services/log-service/log-service';
12 import { Process, getProcess } from 'store/processes/process';
13 import { navigateTo } from 'store/navigation/navigation-action';
14 import { snackbarActions, SnackbarKind } from 'store/snackbar/snackbar-actions';
15 import { CollectionFile, CollectionFileType } from "models/collection-file";
16 import { ContainerRequestResource, ContainerRequestState } from "models/container-request";
18 const SNIPLINE = `================ ✀ ================ ✀ ========= Some log(s) were skipped ========= ✀ ================ ✀ ================`;
19 const LOG_TIMESTAMP_PATTERN = /^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{9}Z/;
21 export const processLogsPanelActions = unionize({
22 RESET_PROCESS_LOGS_PANEL: ofType<{}>(),
23 INIT_PROCESS_LOGS_PANEL: ofType<{ filters: string[], logs: ProcessLogs }>(),
24 SET_PROCESS_LOGS_PANEL_FILTER: ofType<string>(),
25 ADD_PROCESS_LOGS_PANEL_ITEM: ofType<ProcessLogs>(),
28 // Max size of logs to fetch in bytes
29 const maxLogFetchSize: number = 128 * 1000;
31 type FileWithProgress = {
37 logType: LogEventType,
42 export type ProcessLogsPanelAction = UnionOf<typeof processLogsPanelActions>;
44 export const setProcessLogsPanelFilter = (filter: string) =>
45 processLogsPanelActions.SET_PROCESS_LOGS_PANEL_FILTER(filter);
47 export const initProcessLogsPanel = (processUuid: string) =>
48 async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
49 let process: Process | undefined;
51 dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL());
52 process = getProcess(processUuid)(getState().resources);
53 if (process?.containerRequest?.uuid) {
54 // Get log file size info
55 const logFiles = await loadContainerLogFileList(process.containerRequest, logService);
57 // Populate lastbyte 0 for each file
58 const filesWithProgress = logFiles.map((file) => ({ file, lastByte: 0 }));
60 // Fetch array of LogFragments
61 const logLines = await loadContainerLogFileContents(filesWithProgress, logService, process);
63 // Populate initial state with filters
64 const initialState = createInitialLogPanelState(logFiles, logLines);
65 dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
68 // On error, populate empty state to allow polling to start
69 const initialState = createInitialLogPanelState([], []);
70 dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
71 // Only show toast on errors other than 404 since 404 is expected when logs do not exist yet
72 if (e.status !== 404) {
73 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Error loading process logs', hideDuration: 4000, kind: SnackbarKind.ERROR }));
75 if (e.status === 404 && process?.containerRequest.state === ContainerRequestState.FINAL) {
76 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Log collection was trashed or deleted.', hideDuration: 4000, kind: SnackbarKind.WARNING }));
81 export const pollProcessLogs = (processUuid: string) =>
82 async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
84 // Get log panel state and process from store
85 const currentState = getState().processLogsPanel;
86 const process = getProcess(processUuid)(getState().resources);
88 // Check if container request is present and initial logs state loaded
89 if (process?.containerRequest?.uuid && Object.keys(currentState.logs).length > 0) {
90 const logFiles = await loadContainerLogFileList(process.containerRequest, logService);
92 // Determine byte to fetch from while filtering unchanged files
93 const filesToUpdateWithProgress = logFiles.reduce((acc, updatedFile) => {
94 // Fetch last byte or 0 for new log files
95 const currentStateLogLastByte = currentState.logs[logFileToLogType(updatedFile)]?.lastByte || 0;
97 const isNew = !Object.keys(currentState.logs).find((currentStateLogName) => (updatedFile.name.startsWith(currentStateLogName)));
98 const isChanged = !isNew && currentStateLogLastByte < updatedFile.size;
100 if (isNew || isChanged) {
101 return acc.concat({ file: updatedFile, lastByte: currentStateLogLastByte });
105 }, [] as FileWithProgress[]);
107 // Perform range request(s) for each file
108 const logFragments = await loadContainerLogFileContents(filesToUpdateWithProgress, logService, process);
110 if (logFragments.length) {
111 // Convert LogFragments to ProcessLogs with All/Main sorting & line-merging
112 const groupedLogs = groupLogs(logFiles, logFragments);
113 await dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM(groupedLogs));
116 return Promise.resolve();
118 // Remove log when polling error is handled in some way instead of being ignored
119 console.error("Error occurred in pollProcessLogs:", e);
120 return Promise.reject();
124 const loadContainerLogFileList = async (containerRequest: ContainerRequestResource, logService: LogService) => {
125 const logCollectionContents = await logService.listLogFiles(containerRequest);
127 // Filter only root directory files matching log event types which have bytes
128 return logCollectionContents.filter((file): file is CollectionFile => (
129 file.type === CollectionFileType.FILE &&
130 PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(logFileToLogType(file)) > -1 &&
136 * Loads the contents of each file from each file's lastByte simultaneously
137 * while respecting the maxLogFetchSize by requesting the start and end
138 * of the desired block and inserting a snipline.
139 * @param logFilesWithProgress CollectionFiles with the last byte previously loaded
142 * @returns LogFragment[] containing a single LogFragment corresponding to each input file
144 const loadContainerLogFileContents = async (logFilesWithProgress: FileWithProgress[], logService: LogService, process: Process) => (
145 (await Promise.allSettled(logFilesWithProgress.filter(({ file }) => file.size > 0).map(({ file, lastByte }) => {
146 const requestSize = file.size - lastByte;
147 if (requestSize > maxLogFetchSize) {
148 const chunkSize = Math.floor(maxLogFetchSize / 2);
149 const firstChunkEnd = lastByte + chunkSize - 1;
151 logService.getLogFileContents(process.containerRequest, file, lastByte, firstChunkEnd),
152 logService.getLogFileContents(process.containerRequest, file, file.size - chunkSize, file.size - 1)
153 ] as Promise<(LogFragment)>[]);
155 return Promise.all([logService.getLogFileContents(process.containerRequest, file, lastByte, file.size - 1)]);
158 if (res.length && res.every(promiseResult => (promiseResult.status === 'rejected'))) {
159 // Since allSettled does not pass promise rejection we throw an
160 // error if every request failed
161 const error = res.find(
162 (promiseResult): promiseResult is PromiseRejectedResult => promiseResult.status === 'rejected'
164 return Promise.reject(error);
166 return res.filter((promiseResult): promiseResult is PromiseFulfilledResult<LogFragment[]> => (
167 // Filter out log files with rejected promises
168 // (Promise.all rejects on any failure)
169 promiseResult.status === 'fulfilled' &&
170 // Filter out files where any fragment is empty
171 // (prevent incorrect snipline generation or an un-resumable situation)
172 !!promiseResult.value.every(logFragment => logFragment.contents.length)
173 )).map(one => one.value)
174 })).map((logResponseSet) => {
175 // For any multi fragment response set, modify the last line of non-final chunks to include a line break and snip line
176 // Don't add snip line as a separate line so that sorting won't reorder it
177 for (let i = 1; i < logResponseSet.length; i++) {
178 const fragment = logResponseSet[i - 1];
179 const lastLineIndex = fragment.contents.length - 1;
180 const lastLineContents = fragment.contents[lastLineIndex];
181 const newLastLine = `${lastLineContents}\n${SNIPLINE}`;
183 logResponseSet[i - 1].contents[lastLineIndex] = newLastLine;
186 // Merge LogFragment Array (representing multiple log line arrays) into single LogLine[] / LogFragment
187 return logResponseSet.reduce((acc, curr: LogFragment) => ({
188 logType: curr.logType,
189 contents: [...(acc.contents || []), ...curr.contents]
190 }), {} as LogFragment);
194 const createInitialLogPanelState = (logFiles: CollectionFile[], logFragments: LogFragment[]): { filters: string[], logs: ProcessLogs } => {
195 const logs = groupLogs(logFiles, logFragments);
196 const filters = Object.keys(logs);
197 return { filters, logs };
201 * Converts LogFragments into ProcessLogs, grouping and sorting All/Main logs
203 * @param logFragments
204 * @returns ProcessLogs for the store
206 const groupLogs = (logFiles: CollectionFile[], logFragments: LogFragment[]): ProcessLogs => {
207 const sortableLogFragments = mergeMultilineLoglines(logFragments);
209 const allLogs = mergeSortLogFragments(sortableLogFragments);
210 const mainLogs = mergeSortLogFragments(sortableLogFragments.filter((fragment) => (MAIN_EVENT_TYPES.includes(fragment.logType))));
212 const groupedLogs = logFragments.reduce((grouped, fragment) => ({
214 [fragment.logType as string]: { lastByte: fetchLastByteNumber(logFiles, fragment.logType), contents: fragment.contents }
218 [MAIN_FILTER_TYPE]: { lastByte: undefined, contents: mainLogs },
219 [ALL_FILTER_TYPE]: { lastByte: undefined, contents: allLogs },
225 * Checks for non-timestamped log lines and merges them with the previous line, assumes they are multi-line logs
226 * If there is no previous line (first line has no timestamp), the line is deleted.
227 * Only used for combined logs that need sorting by timestamp after merging
228 * @param logFragments
229 * @returns Modified LogFragment[]
231 const mergeMultilineLoglines = (logFragments: LogFragment[]) => (
232 logFragments.map((fragment) => {
233 // Avoid altering the original fragment copy
234 let fragmentCopy: LogFragment = {
235 logType: fragment.logType,
236 contents: [...fragment.contents],
238 // Merge any non-timestamped lines in sortable log types with previous line
239 if (fragmentCopy.contents.length && !NON_SORTED_LOG_TYPES.includes(fragmentCopy.logType)) {
240 for (let i = 0; i < fragmentCopy.contents.length; i++) {
241 const lineContents = fragmentCopy.contents[i];
242 if (!lineContents.match(LOG_TIMESTAMP_PATTERN)) {
243 // Partial line without timestamp detected
245 // If not first line, copy line to previous line
246 const previousLineContents = fragmentCopy.contents[i - 1];
247 const newPreviousLineContents = `${previousLineContents}\n${lineContents}`;
248 fragmentCopy.contents[i - 1] = newPreviousLineContents;
250 // Delete the current line and prevent iterating
251 fragmentCopy.contents.splice(i, 1);
261 * Merges log lines of different types and sorts types that contain timestamps (are sortable)
262 * @param logFragments
263 * @returns string[] of merged and sorted log lines
265 const mergeSortLogFragments = (logFragments: LogFragment[]): string[] => {
266 const sortableFragments = logFragments
267 .filter((fragment) => (!NON_SORTED_LOG_TYPES.includes(fragment.logType)));
269 const nonSortableLines = fragmentsToLines(logFragments
270 .filter((fragment) => (NON_SORTED_LOG_TYPES.includes(fragment.logType)))
271 .sort((a, b) => (a.logType.localeCompare(b.logType))));
273 return [...nonSortableLines, ...sortLogFragments(sortableFragments)];
277 * Performs merge and sort of input log fragment lines
278 * @param logFragments set of sortable log fragments to be merged and sorted
279 * @returns A string array containing all lines, sorted by timestamp and
280 * preserving line ordering and type grouping when timestamps match
282 const sortLogFragments = (logFragments: LogFragment[]): string[] => {
283 const linesWithType: SortableLine[] = logFragments
284 // Map each logFragment into an array of SortableLine
285 .map((fragment: LogFragment): SortableLine[] => (
286 fragment.contents.map((singleLine: string) => {
287 const timestampMatch = singleLine.match(LOG_TIMESTAMP_PATTERN);
288 const timestamp = timestampMatch && timestampMatch[0] ? timestampMatch[0] : "";
290 logType: fragment.logType,
291 timestamp: timestamp,
292 contents: singleLine,
295 // Merge each array of SortableLine into single array
296 )).reduce((acc: SortableLine[], lines: SortableLine[]) => (
298 ), [] as SortableLine[]);
301 .sort(sortableLineSortFunc)
302 .map(lineWithType => lineWithType.contents);
306 * Sort func to sort lines
307 * Preserves original ordering of lines from the same source
308 * Stably orders lines of differing type but same timestamp
309 * (produces a block of same-timestamped lines of one type before a block
310 * of same timestamped lines of another type for readability)
311 * Sorts all other lines by contents (ie by timestamp)
313 const sortableLineSortFunc = (a: SortableLine, b: SortableLine) => {
314 if (a.logType === b.logType) {
316 } else if (a.timestamp === b.timestamp) {
317 return a.logType.localeCompare(b.logType);
319 return a.contents.localeCompare(b.contents);
323 const fragmentsToLines = (fragments: LogFragment[]): string[] => (
324 fragments.reduce((acc, fragment: LogFragment) => (
325 acc.concat(...fragment.contents)
329 const fetchLastByteNumber = (logFiles: CollectionFile[], key: string) => {
330 return logFiles.find((file) => (file.name.startsWith(key)))?.size
333 export const navigateToLogCollection = (uuid: string) =>
334 async (dispatch: Dispatch<any>, getState: () => RootState, services: ServiceRepository) => {
336 await services.collectionService.get(uuid);
337 dispatch<any>(navigateTo(uuid));
339 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Log collection was trashed or deleted.', hideDuration: 4000, kind: SnackbarKind.WARNING }));
343 const ALL_FILTER_TYPE = 'All logs';
345 const MAIN_FILTER_TYPE = 'Main logs';
346 const MAIN_EVENT_TYPES = [
347 LogEventType.CRUNCH_RUN,
352 const PROCESS_PANEL_LOG_EVENT_TYPES = [
353 LogEventType.ARV_MOUNT,
354 LogEventType.CRUNCH_RUN,
355 LogEventType.CRUNCHSTAT,
356 LogEventType.DISPATCH,
357 LogEventType.HOSTSTAT,
358 LogEventType.NODE_INFO,
361 LogEventType.CONTAINER,
362 LogEventType.KEEPSTORE,
365 const NON_SORTED_LOG_TYPES = [
366 LogEventType.NODE_INFO,
367 LogEventType.CONTAINER,